// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vorc_reader.h"

#include <cctz/civil_time_detail.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>

#include <algorithm>
#include <cctype>
#include <list>

#include "vec/exprs/vdirect_in_predicate.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/exprs/vtopn_pred.h"

// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <exception>
#include <iterator>
#include <map>
#include <memory>
#include <ostream>
#include <tuple>
#include <utility>

#include "absl/strings/substitute.h"
#include "cctz/civil_time.h"
#include "cctz/time_zone.h"
#include "common/exception.h"
#include "exprs/create_predicate_function.h"
#include "exprs/hybrid_set.h"
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
#include "olap/id_manager.h"
#include "olap/utils.h"
#include "orc/Exceptions.hh"
#include "orc/Int128.hh"
#include "orc/MemoryPool.hh"
#include "orc/OrcFile.hh"
#include "orc/sargs/Literal.hh"
#include "orc/sargs/SearchArgument.hh"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "util/timezone_utils.h"
#include "vec/columns/column.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_map.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_struct.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/exec/format/orc/orc_file_reader.h"
#include "vec/exec/format/table/transactional_hive_common.h"
#include "vec/exec/scan/file_scanner.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vdirect_in_predicate.h"
#include "vec/exprs/vectorized_fn_call.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vin_predicate.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {
class RuntimeState;
namespace io {
struct IOContext;
enum class FileCachePolicy : uint8_t;
} // namespace io
} // namespace doris

namespace doris::vectorized {
#include "common/compile_check_begin.h"
// TODO: we need to determine it by test.
static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = std::numeric_limits<uint32_t>::max();
static constexpr char EMPTY_STRING_FOR_OVERFLOW[ColumnString::MAX_STRINGS_OVERFLOW_SIZE] = "";
// Because HIVE 0.11 & 0.12 does not support precision and scale for decimal
// The decimal type of orc file produced by HIVE 0.11 & 0.12 are DECIMAL(0,0)
// We should set a default precision and scale for these orc files.
static constexpr int decimal_precision_for_hive11 = BeConsts::MAX_DECIMAL128_PRECISION;
static constexpr int decimal_scale_for_hive11 = 10;

#define FOR_FLAT_ORC_COLUMNS(M)                                   \
    M(PrimitiveType::TYPE_TINYINT, Int8, orc::LongVectorBatch)    \
    M(PrimitiveType::TYPE_BOOLEAN, UInt8, orc::LongVectorBatch)   \
    M(PrimitiveType::TYPE_SMALLINT, Int16, orc::LongVectorBatch)  \
    M(PrimitiveType::TYPE_BIGINT, Int64, orc::LongVectorBatch)    \
    M(PrimitiveType::TYPE_FLOAT, Float32, orc::DoubleVectorBatch) \
    M(PrimitiveType::TYPE_DOUBLE, Float64, orc::DoubleVectorBatch)

void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
    uint64_t has_read = 0;
    char* out = reinterpret_cast<char*>(buf);
    while (has_read < length) {
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
            throw orc::ParseError("stop");
        }
        size_t loop_read;
        Slice result(out + has_read, length - has_read);
        Status st = _tracing_file_reader->read_at(offset + has_read, result, &loop_read, _io_ctx);
        if (!st.ok()) {
            throw orc::ParseError(
                    absl::Substitute("Failed to read $0: $1", _file_name, st.to_string()));
        }
        if (loop_read == 0) {
            break;
        }
        has_read += loop_read;
    }
    if (has_read != length) {
        throw orc::ParseError(absl::Substitute("Try to read $0 bytes from $1, actually read $2",
                                               length, has_read, _file_name));
    }
}

void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t offset) {
    uint64_t has_read = 0;
    char* out = reinterpret_cast<char*>(buf);
    while (has_read < length) {
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
            throw orc::ParseError("stop");
        }
        size_t loop_read;
        Slice result(out + has_read, length - has_read);
        Status st = _inner_reader->read_at(offset + has_read, result, &loop_read, _io_ctx);
        if (!st.ok()) {
            throw orc::ParseError(
                    absl::Substitute("Failed to read $0: $1", _file_name, st.to_string()));
        }
        if (loop_read == 0) {
            break;
        }
        has_read += loop_read;
    }
    if (has_read != length) {
        throw orc::ParseError(absl::Substitute("Try to read $0 bytes from $1, actually read $2",
                                               length, has_read, _file_name));
    }
}

OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
                     const TFileScanRangeParams& params, const TFileRangeDesc& range,
                     size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
                     FileMetaCache* meta_cache, bool enable_lazy_mat)
        : _profile(profile),
          _state(state),
          _scan_params(params),
          _scan_range(range),
          _batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
          _range_start_offset(range.start_offset),
          _range_size(range.size),
          _ctz(ctz),
          _io_ctx(io_ctx),
          _enable_lazy_mat(enable_lazy_mat),
          _enable_filter_by_min_max(
                  state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
          _dict_cols_has_converted(false) {
    TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
    VecDateTimeValue t;
    t.from_unixtime(0, ctz);
    _offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
    _meta_cache = meta_cache;
    _init_profile();
    _init_system_properties();
    _init_file_description();
}

OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
                     const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* meta_cache,
                     bool enable_lazy_mat)
        : _profile(nullptr),
          _scan_params(params),
          _scan_range(range),
          _ctz(ctz),
          _file_system(nullptr),
          _io_ctx(io_ctx),
          _enable_lazy_mat(enable_lazy_mat),
          _enable_filter_by_min_max(true),
          _dict_cols_has_converted(false) {
    _meta_cache = meta_cache;
    _init_system_properties();
    _init_file_description();
}

void OrcReader::_collect_profile_before_close() {
    if (_profile != nullptr) {
        COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time);
        COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time);
        COUNTER_UPDATE(_orc_profile.create_reader_time, _statistics.create_reader_time);
        COUNTER_UPDATE(_orc_profile.init_column_time, _statistics.init_column_time);
        COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
        COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
        COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
        COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
        COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
        COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);
        COUNTER_UPDATE(_orc_profile.file_footer_read_calls, _statistics.file_footer_read_calls);
        COUNTER_UPDATE(_orc_profile.file_footer_hit_cache, _statistics.file_footer_hit_cache);
        if (_file_input_stream != nullptr) {
            _file_input_stream->collect_profile_before_close();
        }
    }
}

int64_t OrcReader::size() const {
    return _file_input_stream->getLength();
}

void OrcReader::_init_profile() {
    if (_profile != nullptr) {
        static const char* orc_profile = "OrcReader";
        ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1);
        _orc_profile.column_read_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", orc_profile, 1);
        _orc_profile.get_batch_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "GetBatchTime", orc_profile, 1);
        _orc_profile.create_reader_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CreateReaderTime", orc_profile, 1);
        _orc_profile.init_column_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "InitColumnTime", orc_profile, 1);
        _orc_profile.set_fill_column_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SetFillColumnTime", orc_profile, 1);
        _orc_profile.decode_value_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
        _orc_profile.decode_null_map_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
        _orc_profile.predicate_filter_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", orc_profile, 1);
        _orc_profile.dict_filter_rewrite_time =
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", orc_profile, 1);
        _orc_profile.lazy_read_filtered_rows =
                ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead", TUnit::UNIT, 1);
        _orc_profile.selected_row_group_count =
                ADD_COUNTER_WITH_LEVEL(_profile, "SelectedRowGroupCount", TUnit::UNIT, 1);
        _orc_profile.evaluated_row_group_count =
                ADD_COUNTER_WITH_LEVEL(_profile, "EvaluatedRowGroupCount", TUnit::UNIT, 1);
        _orc_profile.file_footer_read_calls =
                ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterReadCalls", TUnit::UNIT, 1);
        _orc_profile.file_footer_hit_cache =
                ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterHitCache", TUnit::UNIT, 1);
    }
}

Status OrcReader::_create_file_reader() {
    SCOPED_RAW_TIMER(&_statistics.create_reader_time);
    if (_reader != nullptr) {
        return Status::OK();
    }

    if (_file_input_stream == nullptr) {
        _file_description.mtime =
                _scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
        io::FileReaderOptions reader_options =
                FileFactory::get_reader_options(_state, _file_description);
        auto inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
                _profile, _system_properties, _file_description, reader_options,
                io::DelegateReader::AccessMode::RANDOM, _io_ctx));
        _file_input_stream = std::make_unique<ORCFileInputStream>(
                _scan_range.path, std::move(inner_reader), _io_ctx, _profile,
                _orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
    }
    if (_file_input_stream->getLength() == 0) {
        return Status::EndOfFile("empty orc file: " + _scan_range.path);
    }

    // create orc reader
    orc::ReaderOptions options;
    options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
    options.setReaderMetrics(&_reader_metrics);

    auto create_orc_reader = [&]() {
        try {
            _reader = orc::createReader(
                    std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
        } catch (std::exception& e) {
            // invoker maybe just skip Status.NotFound and continue
            // so we need distinguish between it and other kinds of errors
            std::string _err_msg = e.what();
            if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
                return Status::EndOfFile("stop");
            }
            // one for fs, the other is for oss.
            if (_err_msg.find("No such file or directory") != std::string::npos ||
                _err_msg.find("NoSuchKey") != std::string::npos) {
                return Status::NotFound(_err_msg);
            }
            return Status::InternalError("Init OrcReader failed. reason = {}", _err_msg);
        }
        return Status::OK();
    };

    if (_meta_cache == nullptr) {
        _statistics.file_footer_read_calls++;
        RETURN_IF_ERROR(create_orc_reader());
    } else {
        auto inner_file_reader = _file_input_stream->get_inner_reader();
        const auto& file_meta_cache_key =
                FileMetaCache::get_key(inner_file_reader, _file_description);

        // Local variables can be required because setSerializedFileTail is an assignment operation, not a reference.
        ObjLRUCache::CacheHandle _meta_cache_handle;
        if (_meta_cache->lookup(file_meta_cache_key, &_meta_cache_handle)) {
            const std::string* footer_ptr = _meta_cache_handle.data<String>();
            options.setSerializedFileTail(*footer_ptr);
            RETURN_IF_ERROR(create_orc_reader());
            _statistics.file_footer_hit_cache++;
        } else {
            _statistics.file_footer_read_calls++;
            RETURN_IF_ERROR(create_orc_reader());
            std::string* footer_ptr = new std::string {_reader->getSerializedFileTail()};
            _meta_cache->insert(file_meta_cache_key, footer_ptr, &_meta_cache_handle);
        }
    }

    return Status::OK();
}

Status OrcReader::init_reader(
        const std::vector<std::string>* column_names, const VExprContextSPtrs& conjuncts,
        bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
        const VExprContextSPtrs* not_single_slot_filter_conjuncts,
        const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
        std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr,
        const std::set<uint64_t>& column_ids, const std::set<uint64_t>& filter_column_ids) {
    _table_column_names = column_names;
    _lazy_read_ctx.conjuncts = conjuncts;
    _is_acid = is_acid;
    _tuple_descriptor = tuple_descriptor;
    _row_descriptor = row_descriptor;
    _table_info_node_ptr = table_info_node_ptr;
    _column_ids = column_ids;
    _filter_column_ids = filter_column_ids;

    if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
        _not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
                                                 not_single_slot_filter_conjuncts->begin(),
                                                 not_single_slot_filter_conjuncts->end());
    }
    _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
    _obj_pool = std::make_unique<ObjectPool>();

    if (_state != nullptr) {
        _orc_tiny_stripe_threshold_bytes = _state->query_options().orc_tiny_stripe_threshold_bytes;
        _orc_once_max_read_bytes = _state->query_options().orc_once_max_read_bytes;
        _orc_max_merge_distance_bytes = _state->query_options().orc_max_merge_distance_bytes;
    }

    RETURN_IF_ERROR(_create_file_reader());
    RETURN_IF_ERROR(_init_read_columns());
    return Status::OK();
}

// init file reader for parsing schema
Status OrcReader::init_schema_reader() {
    return _create_file_reader();
}

Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
                                    std::vector<DataTypePtr>* col_types) {
    const auto& root_type = _is_acid ? remove_acid(_reader->getType()) : _reader->getType();
    for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
        col_names->emplace_back(root_type.getFieldName(i));
        col_types->emplace_back(convert_to_doris_type(root_type.getSubtype(i)));
    }
    return Status::OK();
}

Status OrcReader::_init_read_columns() {
    SCOPED_RAW_TIMER(&_statistics.init_column_time);
    const auto& root_type = _reader->getType();

    // Build column ID to file type mapping for all columns in file
    _column_id_to_file_type.clear();
    std::function<void(const orc::Type*, int)> build_column_id_map = [&](const orc::Type* type,
                                                                         int depth) {
        if (type == nullptr) return;
        uint64_t col_id = type->getColumnId();
        _column_id_to_file_type[col_id] = type;

        std::string indent(depth * 2, ' ');
        VLOG_DEBUG << indent << "[OrcReader] Mapping column_id=" << col_id
                   << ", kind=" << static_cast<int>(type->getKind())
                   << ", subtype_count=" << type->getSubtypeCount();

        for (uint64_t i = 0; i < type->getSubtypeCount(); ++i) {
            build_column_id_map(type->getSubtype(i), depth + 1);
        }
    };

    VLOG_DEBUG << "[OrcReader] Building column ID to file type mapping...";
    build_column_id_map(&root_type, 0);
    VLOG_DEBUG << "[OrcReader] Total mapped columns: " << _column_id_to_file_type.size();

    if (_is_acid) {
        for (uint64_t i = 0; i < root_type.getSubtypeCount(); ++i) {
            if (root_type.getSubtype(i)->getKind() == orc::TypeKind::STRUCT) {
                auto row_orc_type = root_type.getSubtype(i);
                for (uint64_t j = 0; j < row_orc_type->getSubtypeCount(); j++) {
                    _type_map.emplace(TransactionalHive::ROW + "." + row_orc_type->getFieldName(j),
                                      row_orc_type->getSubtype(j));
                }
            } else {
                _type_map.emplace(root_type.getFieldName(i), root_type.getSubtype(i));
            }
        }
    } else {
        for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
            _type_map.emplace(root_type.getFieldName(i), root_type.getSubtype(i));
        }
    }

    for (size_t i = 0; i < _table_column_names->size(); ++i) {
        const auto& table_column_name = (*_table_column_names)[i];
        if (!_table_info_node_ptr->children_column_exists(table_column_name)) {
            _missing_cols.emplace_back(table_column_name);
            continue;
        }
        const auto file_column_name =
                _table_info_node_ptr->children_file_column_name(table_column_name);
        _read_file_cols.emplace_back(file_column_name);
        _read_table_cols.emplace_back(table_column_name);
    }
    return Status::OK();
}

bool OrcReader::_check_acid_schema(const orc::Type& type) {
    if (orc::TypeKind::STRUCT == type.getKind()) {
        if (type.getSubtypeCount() != TransactionalHive::ACID_COLUMN_NAMES.size()) {
            return false;
        }
        for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) {
            const std::string& field_name = type.getFieldName(i);
            std::string field_name_lower_case = field_name;
            std::transform(field_name.begin(), field_name.end(), field_name_lower_case.begin(),
                           [](unsigned char c) { return std::tolower(c); });
            if (field_name_lower_case != TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE[i]) {
                return false;
            }
        }
    } else {
        return false;
    }
    return true;
}

const orc::Type& OrcReader::remove_acid(const orc::Type& type) {
    if (_check_acid_schema(type)) {
        return *(type.getSubtype(TransactionalHive::ROW_OFFSET));
    } else {
        return type;
    }
}

//  orc only support LONG, FLOAT, STRING, DATE, DECIMAL, TIMESTAMP, BOOLEAN to push down predicates
static std::unordered_map<orc::TypeKind, orc::PredicateDataType> TYPEKIND_TO_PREDICATE_TYPE = {
        {orc::TypeKind::BYTE, orc::PredicateDataType::LONG},
        {orc::TypeKind::SHORT, orc::PredicateDataType::LONG},
        {orc::TypeKind::INT, orc::PredicateDataType::LONG},
        {orc::TypeKind::LONG, orc::PredicateDataType::LONG},
        {orc::TypeKind::FLOAT, orc::PredicateDataType::FLOAT},
        {orc::TypeKind::DOUBLE, orc::PredicateDataType::FLOAT},
        {orc::TypeKind::STRING, orc::PredicateDataType::STRING},
        {orc::TypeKind::BINARY, orc::PredicateDataType::STRING},
        // should not pust down CHAR type, because CHAR type is fixed length and will be padded
        // {orc::TypeKind::CHAR, orc::PredicateDataType::STRING},
        {orc::TypeKind::VARCHAR, orc::PredicateDataType::STRING},
        {orc::TypeKind::DATE, orc::PredicateDataType::DATE},
        {orc::TypeKind::DECIMAL, orc::PredicateDataType::DECIMAL},
        {orc::TypeKind::TIMESTAMP, orc::PredicateDataType::TIMESTAMP},
        {orc::TypeKind::BOOLEAN, orc::PredicateDataType::BOOLEAN}};

template <PrimitiveType primitive_type>
std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,
                                                      StringRef& literal_data, int precision,
                                                      int scale) {
    const auto* value = literal_data.data;
    try {
        switch (type->getKind()) {
        case orc::TypeKind::BOOLEAN: {
            if (primitive_type != TYPE_BOOLEAN) {
                return std::make_tuple(false, orc::Literal(false));
            }
            return std::make_tuple(true, orc::Literal(bool(*((uint8_t*)value))));
        }
        case orc::TypeKind::BYTE:
        case orc::TypeKind::SHORT:
        case orc::TypeKind::INT:
        case orc::TypeKind::LONG: {
            if constexpr (primitive_type == TYPE_TINYINT) {
                return std::make_tuple(true, orc::Literal(int64_t(*((int8_t*)value))));
            } else if constexpr (primitive_type == TYPE_SMALLINT) {
                return std::make_tuple(true, orc::Literal(int64_t(*((int16_t*)value))));
            } else if constexpr (primitive_type == TYPE_INT) {
                return std::make_tuple(true, orc::Literal(int64_t(*((int32_t*)value))));
            } else if constexpr (primitive_type == TYPE_BIGINT) {
                return std::make_tuple(true, orc::Literal(int64_t(*((int64_t*)value))));
            }
            return std::make_tuple(false, orc::Literal(false));
        }
        case orc::TypeKind::FLOAT: {
            if constexpr (primitive_type == TYPE_FLOAT) {
                return std::make_tuple(true, orc::Literal(double(*((float*)value))));
            } else if constexpr (primitive_type == TYPE_DOUBLE) {
                return std::make_tuple(true, orc::Literal(double(*((double*)value))));
            }
            return std::make_tuple(false, orc::Literal(false));
        }
        case orc::TypeKind::DOUBLE: {
            if (primitive_type == TYPE_DOUBLE) {
                return std::make_tuple(true, orc::Literal(*((double*)value)));
            }
            return std::make_tuple(false, orc::Literal(false));
        }
        case orc::TypeKind::STRING:
            [[fallthrough]];
        case orc::TypeKind::BINARY:
            [[fallthrough]];
        // should not pust down CHAR type, because CHAR type is fixed length and will be padded
        // case orc::TypeKind::CHAR:
        //     [[fallthrough]];
        case orc::TypeKind::VARCHAR: {
            if (primitive_type == TYPE_STRING || primitive_type == TYPE_CHAR ||
                primitive_type == TYPE_VARCHAR) {
                return std::make_tuple(true, orc::Literal(literal_data.data, literal_data.size));
            }
            return std::make_tuple(false, orc::Literal(false));
        }
        case orc::TypeKind::DECIMAL: {
            int128_t decimal_value;
            if constexpr (primitive_type == TYPE_DECIMALV2) {
                decimal_value = *reinterpret_cast<const int128_t*>(value);
                precision = DecimalV2Value::PRECISION;
                scale = DecimalV2Value::SCALE;
            } else if constexpr (primitive_type == TYPE_DECIMAL32) {
                decimal_value = *((int32_t*)value);
            } else if constexpr (primitive_type == TYPE_DECIMAL64) {
                decimal_value = *((int64_t*)value);
            } else if constexpr (primitive_type == TYPE_DECIMAL128I) {
                decimal_value = *((int128_t*)value);
            } else {
                return std::make_tuple(false, orc::Literal(false));
            }
            return std::make_tuple(true, orc::Literal(orc::Int128(uint64_t(decimal_value >> 64),
                                                                  uint64_t(decimal_value)),
                                                      precision, scale));
        }
        case orc::TypeKind::DATE: {
            int64_t day_offset;
            static const cctz::time_zone utc0 = cctz::utc_time_zone();
            if constexpr (primitive_type == TYPE_DATE) {
                const VecDateTimeValue date_v1 = *reinterpret_cast<const VecDateTimeValue*>(value);
                cctz::civil_day civil_date(date_v1.year(), date_v1.month(), date_v1.day());
                day_offset =
                        cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60);
            } else if (primitive_type == TYPE_DATEV2) {
                const DateV2Value<DateV2ValueType> date_v2 =
                        *reinterpret_cast<const DateV2Value<DateV2ValueType>*>(value);
                cctz::civil_day civil_date(date_v2.year(), date_v2.month(), date_v2.day());
                day_offset =
                        cctz::convert(civil_date, utc0).time_since_epoch().count() / (24 * 60 * 60);
            } else {
                return std::make_tuple(false, orc::Literal(false));
            }
            return std::make_tuple(true, orc::Literal(orc::PredicateDataType::DATE, day_offset));
        }
        case orc::TypeKind::TIMESTAMP: {
            int64_t seconds;
            int32_t nanos;
            static const cctz::time_zone utc0 = cctz::utc_time_zone();
            // TODO: ColumnValueRange has lost the precision of microsecond
            if constexpr (primitive_type == TYPE_DATETIME) {
                const VecDateTimeValue datetime_v1 =
                        *reinterpret_cast<const VecDateTimeValue*>(value);
                cctz::civil_second civil_seconds(datetime_v1.year(), datetime_v1.month(),
                                                 datetime_v1.day(), datetime_v1.hour(),
                                                 datetime_v1.minute(), datetime_v1.second());
                seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count();
                nanos = 0;
            } else if (primitive_type == TYPE_DATETIMEV2) {
                const DateV2Value<DateTimeV2ValueType> datetime_v2 =
                        *reinterpret_cast<const DateV2Value<DateTimeV2ValueType>*>(value);
                cctz::civil_second civil_seconds(datetime_v2.year(), datetime_v2.month(),
                                                 datetime_v2.day(), datetime_v2.hour(),
                                                 datetime_v2.minute(), datetime_v2.second());
                seconds = cctz::convert(civil_seconds, utc0).time_since_epoch().count();
                nanos = datetime_v2.microsecond() * 1000;
            } else {
                return std::make_tuple(false, orc::Literal(false));
            }
            return std::make_tuple(true, orc::Literal(seconds, nanos));
        }
        default:
            return std::make_tuple(false, orc::Literal(false));
        }
    } catch (Exception& e) {
        // When table schema changed, and using new schema to read old data.
        LOG(WARNING) << "Failed to convert doris value to orc predicate literal, error = "
                     << e.what();
        return std::make_tuple(false, orc::Literal(false));
    }
}

std::pair<bool, orc::PredicateDataType> OrcReader::_get_orc_predicate_type(
        const VSlotRef* slot_ref) {
    DCHECK(_table_info_node_ptr->children_column_exists(slot_ref->expr_name()));
    auto file_col_name = _table_info_node_ptr->children_file_column_name(slot_ref->expr_name());
    if (!_type_map.contains(file_col_name)) {
        LOG(WARNING) << "Column " << slot_ref->expr_name() << "in file name" << file_col_name
                     << " not found in _type_map";
        return {false, orc::PredicateDataType::LONG};
    }
    DCHECK(_type_map.contains(file_col_name));
    const auto* orc_type = _type_map[file_col_name];
    if (!TYPEKIND_TO_PREDICATE_TYPE.contains(orc_type->getKind())) {
        LOG(WARNING) << "Unsupported Push Down Orc Type [TypeKind=" << orc_type->getKind() << "]";
        return {false, orc::PredicateDataType::LONG};
    }
    const auto predicate_type = TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
    return {true, predicate_type};
}

std::pair<bool, orc::Literal> OrcReader::_make_orc_literal(const VSlotRef* slot_ref,
                                                           const VLiteral* literal) {
    // Get predicate type using new function
    auto [valid_pred_type, predicate_type] = _get_orc_predicate_type(slot_ref);
    if (!valid_pred_type) {
        return {false, orc::Literal(false)};
    }

    // Get the orc_type again here as it's needed for convert_to_orc_literal
    auto file_col_name = _table_info_node_ptr->children_file_column_name(slot_ref->expr_name());
    const auto* orc_type = _type_map[file_col_name];

    DCHECK(literal != nullptr);
    // this only happens when the literals of in_predicate contains null value, like in (1, null)
    if (literal->get_column_ptr()->is_null_at(0)) {
        return {false, orc::Literal(false)};
    }
    auto literal_data = literal->get_column_ptr()->get_data_at(0);
    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
    auto slot_type = slot->type();
    auto primitive_type = slot_type->get_primitive_type();
    auto src_type = convert_to_doris_type(orc_type)->get_primitive_type();
    // should not down predicate for string type change from other type
    if (src_type != primitive_type && !is_string_type(src_type) && is_string_type(primitive_type)) {
        LOG(WARNING) << "Unsupported Push Down Schema Changed Column " << primitive_type << " to "
                     << src_type;
        return {false, orc::Literal(false)};
    }
    switch (primitive_type) {
#define M(NAME)                                                                              \
    case TYPE_##NAME: {                                                                      \
        auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(                     \
                orc_type, literal_data, slot_type->get_precision(), slot_type->get_scale()); \
        return {valid, orc_literal};                                                         \
    }
#define APPLY_FOR_PRIMITIVE_TYPE(M) \
    M(TINYINT)                      \
    M(SMALLINT)                     \
    M(INT)                          \
    M(BIGINT)                       \
    M(LARGEINT)                     \
    M(DATE)                         \
    M(DATETIME)                     \
    M(DATEV2)                       \
    M(DATETIMEV2)                   \
    M(VARCHAR)                      \
    M(STRING)                       \
    M(HLL)                          \
    M(DECIMAL32)                    \
    M(DECIMAL64)                    \
    M(DECIMAL128I)                  \
    M(DECIMAL256)                   \
    M(DECIMALV2)                    \
    M(BOOLEAN)                      \
    M(IPV4)                         \
    M(IPV6)
        APPLY_FOR_PRIMITIVE_TYPE(M)
#undef M
    default: {
        VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << slot->col_name() << "]";
        return {false, orc::Literal(false)};
    }
    }
}

// check if the slot of expr can be pushed down to orc reader and make orc predicate type
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
    if (!expr->children()[0]->is_slot_ref()) {
        return false;
    }
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    // check if the slot exists in orc file and not partition column
    if (_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name()) ||
        (!_table_info_node_ptr->children_column_exists(slot_ref->expr_name()))) {
        return false;
    }

    // Directly use _get_orc_predicate_type since we only need the type
    auto [valid, predicate_type] = _get_orc_predicate_type(slot_ref);
    if (valid) {
        _vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
    }
    return valid;
}

// check if the literal of expr can be pushed down to orc reader and make orc literal
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, size_t child_id) {
    if (!expr->children()[child_id]->is_literal()) {
        return false;
    }
    // the slot has been checked in _check_slot_can_push_down before calling this function
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    const auto* literal = static_cast<const VLiteral*>(expr->children()[child_id].get());
    auto [valid, orc_literal] = _make_orc_literal(slot_ref, literal);
    if (valid) {
        _vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
    }
    return valid;
}

// check if there are rest children of expr can be pushed down to orc reader
bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
    if (expr->children().size() < 2) {
        return false;
    }

    bool at_least_one_child_can_push_down = false;
    for (size_t i = 1; i < expr->children().size(); ++i) {
        if (_check_literal_can_push_down(expr, i)) {
            at_least_one_child_can_push_down = true;
        }
    }
    return at_least_one_child_can_push_down;
}

// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
    if (expr == nullptr) {
        return false;
    }

    switch (expr->op()) {
    case TExprOpcode::COMPOUND_AND:
        // at least one child can be pushed down
        return std::ranges::any_of(expr->children(), [this](const auto& child) {
            return _check_expr_can_push_down(child);
        });
    case TExprOpcode::COMPOUND_OR:
        // all children must be pushed down
        return std::ranges::all_of(expr->children(), [this](const auto& child) {
            return _check_expr_can_push_down(child);
        });
    case TExprOpcode::COMPOUND_NOT:
        DCHECK_EQ(expr->children().size(), 1);
        return _check_expr_can_push_down(expr->children()[0]);

    case TExprOpcode::GE:
    case TExprOpcode::GT:
    case TExprOpcode::LE:
    case TExprOpcode::LT:
    case TExprOpcode::EQ:
    case TExprOpcode::NE:
    case TExprOpcode::FILTER_IN:
    case TExprOpcode::FILTER_NOT_IN:
        // can't push down if expr is null aware predicate
        return expr->node_type() != TExprNodeType::NULL_AWARE_BINARY_PRED &&
               expr->node_type() != TExprNodeType::NULL_AWARE_IN_PRED &&
               _check_slot_can_push_down(expr) && _check_rest_children_can_push_down(expr);

    case TExprOpcode::INVALID_OPCODE:
        if (expr->node_type() == TExprNodeType::FUNCTION_CALL) {
            auto fn_name = expr->fn().name.function_name;
            // only support is_null_pred and is_not_null_pred
            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
                return _check_slot_can_push_down(expr);
            }
            VLOG_CRITICAL << "Unsupported function [funciton=" << fn_name << "]";
        }
        return false;
    default:
        VLOG_CRITICAL << "Unsupported Opcode [OpCode=" << expr->op() << "]";
        return false;
    }
}

void OrcReader::_build_less_than(const VExprSPtr& expr,
                                 std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
    DCHECK(expr->children().size() == 2);
    DCHECK(expr->children()[0]->is_slot_ref());
    DCHECK(expr->children()[1]->is_literal());
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
    DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
    auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
    DCHECK(_vliteral_to_orc_literal.contains(literal));
    auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
    builder->lessThan(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
                      predicate_type, orc_literal);
}

void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
                                        std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
    DCHECK(expr->children().size() == 2);
    DCHECK(expr->children()[0]->is_slot_ref());
    DCHECK(expr->children()[1]->is_literal());
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
    DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
    auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
    DCHECK(_vliteral_to_orc_literal.contains(literal));
    auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
    builder->lessThanEquals(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
                            predicate_type, orc_literal);
}

void OrcReader::_build_equals(const VExprSPtr& expr,
                              std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
    DCHECK(expr->children().size() == 2);
    DCHECK(expr->children()[0]->is_slot_ref());
    DCHECK(expr->children()[1]->is_literal());
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
    DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
    auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
    DCHECK(_vliteral_to_orc_literal.contains(literal));
    auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
    builder->equals(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
                    predicate_type, orc_literal);
}

void OrcReader::_build_filter_in(const VExprSPtr& expr,
                                 std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
    DCHECK(expr->children().size() >= 2);
    DCHECK(expr->children()[0]->is_slot_ref());
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    std::vector<orc::Literal> literals;
    DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
    orc::PredicateDataType predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
    for (size_t i = 1; i < expr->children().size(); ++i) {
        DCHECK(expr->children()[i]->is_literal());
        const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
        if (_vliteral_to_orc_literal.contains(literal)) {
            auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
            literals.emplace_back(orc_literal);
        }
    }
    DCHECK(!literals.empty());
    if (literals.size() == 1) {
        builder->equals(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
                        predicate_type, literals[0]);
    } else {
        builder->in(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
                    predicate_type, literals);
    }
}

void OrcReader::_build_is_null(const VExprSPtr& expr,
                               std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
    DCHECK(expr->children().size() == 1);
    DCHECK(expr->children()[0]->is_slot_ref());
    const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
    DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
    auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
    builder->isNull(_table_info_node_ptr->children_file_column_name(slot_ref->expr_name()),
                    predicate_type);
}

bool OrcReader::_build_search_argument(const VExprSPtr& expr,
                                       std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
    // OPTIMIZE: check expr only once
    if (!_check_expr_can_push_down(expr)) {
        return false;
    }
    switch (expr->op()) {
    case TExprOpcode::COMPOUND_AND: {
        builder->startAnd();
        bool at_least_one_can_push_down = false;
        for (const auto& child : expr->children()) {
            if (_build_search_argument(child, builder)) {
                at_least_one_can_push_down = true;
            }
        }
        DCHECK(at_least_one_can_push_down);
        builder->end();
        break;
    }
    case TExprOpcode::COMPOUND_OR: {
        builder->startOr();
        bool all_can_push_down = true;
        for (const auto& child : expr->children()) {
            if (!_build_search_argument(child, builder)) {
                all_can_push_down = false;
            }
        }
        DCHECK(all_can_push_down);
        builder->end();
        break;
    }
    case TExprOpcode::COMPOUND_NOT: {
        DCHECK_EQ(expr->children().size(), 1);
        builder->startNot();
        auto res = _build_search_argument(expr->children()[0], builder);
        DCHECK(res);
        builder->end();
        break;
    }
    case TExprOpcode::GE:
        builder->startNot();
        _build_less_than(expr, builder);
        builder->end();
        break;
    case TExprOpcode::GT:
        builder->startNot();
        _build_less_than_equals(expr, builder);
        builder->end();
        break;
    case TExprOpcode::LE:
        _build_less_than_equals(expr, builder);
        break;
    case TExprOpcode::LT:
        _build_less_than(expr, builder);
        break;
    case TExprOpcode::EQ:
        _build_equals(expr, builder);
        break;
    case TExprOpcode::NE:
        builder->startNot();
        _build_equals(expr, builder);
        builder->end();
        break;
    case TExprOpcode::FILTER_IN:
        _build_filter_in(expr, builder);
        break;
    case TExprOpcode::FILTER_NOT_IN:
        builder->startNot();
        _build_filter_in(expr, builder);
        builder->end();
        break;
    // is null and is not null is represented as function call
    case TExprOpcode::INVALID_OPCODE:
        DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
        if (expr->fn().name.function_name == "is_null_pred") {
            _build_is_null(expr, builder);
        } else if (expr->fn().name.function_name == "is_not_null_pred") {
            builder->startNot();
            _build_is_null(expr, builder);
            builder->end();
        } else {
            // should not reach here, because _check_expr_can_push_down has already checked
            __builtin_unreachable();
        }
        break;

    default:
        // should not reach here, because _check_expr_can_push_down has already checked
        __builtin_unreachable();
    }
    return true;
}

bool OrcReader::_init_search_argument(const VExprSPtrs& exprs) {
    // build search argument, if any expr can not be pushed down, return false
    auto builder = orc::SearchArgumentFactory::newBuilder();
    bool at_least_one_can_push_down = false;
    builder->startAnd();
    for (const auto& expr : exprs) {
        _vslot_ref_to_orc_predicate_data_type.clear();
        _vliteral_to_orc_literal.clear();
        if (_build_search_argument(expr, builder)) {
            at_least_one_can_push_down = true;
        }
    }
    if (!at_least_one_can_push_down) {
        // if all exprs can not be pushed down, builder->end() will throw exception
        return false;
    }
    builder->end();

    auto sargs = builder->build();
    _profile->add_info_string("OrcReader SearchArgument: ", sargs->toString());
    _row_reader_options.searchArgument(std::move(sargs));
    return true;
}

Status OrcReader::set_fill_columns(
        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
                partition_columns,
        const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
    SCOPED_RAW_TIMER(&_statistics.set_fill_column_time);

    // std::unordered_map<column_name, std::pair<col_id, slot_id>>
    std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_table_columns;
    // visit_slot for lazy mat.
    std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
        if (expr->is_slot_ref()) {
            VSlotRef* slot_ref = static_cast<VSlotRef*>(expr);
            auto expr_name = slot_ref->expr_name();
            predicate_table_columns.emplace(
                    expr_name, std::make_pair(slot_ref->column_id(), slot_ref->slot_id()));
            if (slot_ref->column_id() == 0) {
                _lazy_read_ctx.resize_first_column = false;
            }
            return;
        }
        for (auto& child : expr->children()) {
            visit_slot(child.get());
        }
    };

    for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
        auto expr = conjunct->root();

        if (expr->is_rf_wrapper()) {
            // REF: src/runtime_filter/runtime_filter_consumer.cpp
            auto* runtime_filter = static_cast<VRuntimeFilterWrapper*>(expr.get());

            auto filter_impl = runtime_filter->get_impl();
            visit_slot(filter_impl.get());

            // only support push down for filter row group : MAX_FILTER, MAX_FILTER, MINMAX_FILTER, IN_FILTER
            if ((runtime_filter->node_type() == TExprNodeType::BINARY_PRED) &&
                (runtime_filter->op() == TExprOpcode::GE ||
                 runtime_filter->op() == TExprOpcode::LE)) {
                expr = filter_impl;
            } else if (runtime_filter->node_type() == TExprNodeType::IN_PRED &&
                       runtime_filter->op() == TExprOpcode::FILTER_IN) {
                auto* direct_in_predicate = static_cast<VDirectInPredicate*>(filter_impl.get());

                int max_in_size =
                        _state->query_options().__isset.max_pushdown_conditions_per_column
                                ? _state->query_options().max_pushdown_conditions_per_column
                                : 1024;
                if (direct_in_predicate->get_set_func()->size() == 0 ||
                    direct_in_predicate->get_set_func()->size() > max_in_size) {
                    continue;
                }

                VExprSPtr new_in_slot = nullptr;
                if (direct_in_predicate->get_slot_in_expr(new_in_slot)) {
                    expr = new_in_slot;
                } else {
                    continue;
                }
            } else {
                continue;
            }
        } else if (VTopNPred* topn_pred = typeid_cast<VTopNPred*>(expr.get())) {
            // top runtime filter : only le && ge.
            DCHECK(topn_pred->children().size() > 0);
            visit_slot(topn_pred->children()[0].get());

            VExprSPtr binary_expr;
            if (topn_pred->get_binary_expr(binary_expr)) {
                // for min-max filter.
                expr = binary_expr;
            } else {
                continue;
            }
        } else {
            visit_slot(expr.get());
        }

        if (_check_expr_can_push_down(expr)) {
            _push_down_exprs.emplace_back(expr);
        }
    }

    if (_is_acid) {
        _lazy_read_ctx.predicate_orc_columns.insert(
                _lazy_read_ctx.predicate_orc_columns.end(),
                TransactionalHive::READ_ROW_COLUMN_NAMES.begin(),
                TransactionalHive::READ_ROW_COLUMN_NAMES.end());
    }

    for (auto& read_table_col : _read_table_cols) {
        _lazy_read_ctx.all_read_columns.emplace_back(read_table_col);
        if (!predicate_table_columns.empty()) {
            auto iter = predicate_table_columns.find(read_table_col);
            if (iter == predicate_table_columns.end()) {
                if (!_is_acid ||
                    std::find(TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
                              TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end(),
                              read_table_col) ==
                            TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end()) {
                    _lazy_read_ctx.lazy_read_columns.emplace_back(read_table_col);
                }
            } else {
                _lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
                _lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);

                _lazy_read_ctx.predicate_orc_columns.emplace_back(
                        _table_info_node_ptr->children_file_column_name(iter->first));
            }
        }
    }

    for (const auto& kv : partition_columns) {
        auto iter = predicate_table_columns.find(kv.first);
        if (iter == predicate_table_columns.end()) {
            _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
        } else {
            _lazy_read_ctx.predicate_partition_columns.emplace(kv.first, kv.second);
        }
    }

    for (const auto& kv : missing_columns) {
        auto iter = predicate_table_columns.find(kv.first);
        if (iter == predicate_table_columns.end()) {
            _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
        } else {
            //For check missing column :   missing column == xx, missing column is null,missing column is not null.
            if (_slot_id_to_filter_conjuncts->find(iter->second.second) !=
                _slot_id_to_filter_conjuncts->end()) {
                for (const auto& ctx :
                     _slot_id_to_filter_conjuncts->find(iter->second.second)->second) {
                    _filter_conjuncts.emplace_back(ctx); //  todo ??????
                }
            }

            // predicate_missing_columns is VLiteral.To fill in default values for missing columns.
            _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second);
        }
    }

    if (_enable_lazy_mat && !_lazy_read_ctx.predicate_columns.first.empty() &&
        !_lazy_read_ctx.lazy_read_columns.empty()) {
        _lazy_read_ctx.can_lazy_read = true;
    }

    if (_lazy_read_ctx.conjuncts.empty()) {
        _lazy_read_ctx.can_lazy_read = false;
    } else if (_enable_filter_by_min_max) {
        auto res = _init_search_argument(_push_down_exprs);
        if (_state->query_options().check_orc_init_sargs_success && !res) {
            std::stringstream ss;
            for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
                ss << conjunct->root()->debug_string() << "\n";
            }
            std::string conjuncts_str = ss.str();
            return Status::InternalError(
                    "Session variable check_orc_init_sargs_success is set, but "
                    "_init_search_argument returns false because all exprs can not be pushed "
                    "down:\n " +
                    conjuncts_str);
        }
    }
    try {
        _row_reader_options.range(_range_start_offset, _range_size);
        _row_reader_options.setTimezoneName(_ctz == "CST" ? "Asia/Shanghai" : _ctz);
        if (!_column_ids.empty()) {
            std::list<uint64_t> column_ids_list(_column_ids.begin(), _column_ids.end());
            _row_reader_options.includeTypes(column_ids_list);
        } else { // If column_ids is empty, include all top-level columns to be read.
            _row_reader_options.include(_read_file_cols);
        }
        _row_reader_options.setEnableLazyDecoding(true);

        //orc reader should not use the tiny stripe optimization when reading by row id.
        if (!_read_by_rows) {
            uint64_t number_of_stripes = _reader->getNumberOfStripes();
            auto all_stripes_needed = _reader->getNeedReadStripes(_row_reader_options);

            int64_t range_end_offset = _range_start_offset + _range_size;

            bool all_tiny_stripes = true;
            std::vector<io::PrefetchRange> tiny_stripe_ranges;

            for (uint64_t i = 0; i < number_of_stripes; i++) {
                std::unique_ptr<orc::StripeInformation> strip_info = _reader->getStripe(i);
                uint64_t strip_start_offset = strip_info->getOffset();
                uint64_t strip_end_offset = strip_start_offset + strip_info->getLength();

                if (strip_start_offset >= range_end_offset ||
                    strip_end_offset < _range_start_offset || !all_stripes_needed[i]) {
                    continue;
                }
                if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) {
                    all_tiny_stripes = false;
                    break;
                }

                tiny_stripe_ranges.emplace_back(strip_start_offset, strip_end_offset);
            }
            if (all_tiny_stripes && number_of_stripes > 0) {
                std::vector<io::PrefetchRange> prefetch_merge_ranges =
                        io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
                                                                     _orc_max_merge_distance_bytes,
                                                                     _orc_once_max_read_bytes);
                auto range_finder = std::make_shared<io::LinearProbeRangeFinder>(
                        std::move(prefetch_merge_ranges));

                auto* orc_input_stream_ptr = static_cast<ORCFileInputStream*>(_reader->getStream());
                orc_input_stream_ptr->set_all_tiny_stripes();
                auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
                auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
                orc_file_reader = std::make_shared<io::RangeCacheFileReader>(
                        _profile, orc_inner_reader, range_finder);
            }
        }

        if (!_lazy_read_ctx.can_lazy_read) {
            for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
                _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
            }
            for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
                _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
            }
        }

        _fill_all_columns = true;
        // create orc row reader
        if (_lazy_read_ctx.can_lazy_read) {
            _row_reader_options.filter(_lazy_read_ctx.predicate_orc_columns);
            _orc_filter = std::make_unique<ORCFilterImpl>(this);
        }
        if (!_lazy_read_ctx.conjuncts.empty()) {
            _string_dict_filter = std::make_unique<StringDictFilterImpl>(this);
        }
        _row_reader = _reader->createRowReader(_row_reader_options, _orc_filter.get(),
                                               _string_dict_filter.get());

        _batch = _row_reader->createRowBatch(_batch_size);
        const auto& selected_type = _row_reader->getSelectedType();
        int idx = 0;
        if (_is_acid) {
            for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
                auto sub_type = selected_type.getSubtype(i);
                if (sub_type->getKind() == orc::TypeKind::STRUCT) {
                    for (int j = 0; j < sub_type->getSubtypeCount(); ++j) {
                        _colname_to_idx[TransactionalHive::ROW + "." + sub_type->getFieldName(j)] =
                                idx++;
                    }
                } else {
                    _colname_to_idx[selected_type.getFieldName(i)] = idx++;
                }
            }
        } else {
            for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
                _colname_to_idx[selected_type.getFieldName(i)] = idx++;
            }
        }

        _type_map.clear();
        if (_is_acid) {
            for (uint64_t i = 0; i < selected_type.getSubtypeCount(); ++i) {
                if (selected_type.getSubtype(i)->getKind() == orc::TypeKind::STRUCT) {
                    auto row_orc_type = selected_type.getSubtype(i);
                    for (uint64_t j = 0; j < row_orc_type->getSubtypeCount(); j++) {
                        std::string field_name =
                                TransactionalHive::ROW + "." + row_orc_type->getFieldName(j);
                        _type_map.emplace(field_name, row_orc_type->getSubtype(j));
                    }
                } else {
                    std::string field_name = selected_type.getFieldName(i);
                    _type_map.emplace(field_name, selected_type.getSubtype(i));
                }
            }
        } else {
            for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
                std::string field_name = selected_type.getFieldName(i);
                _type_map.emplace(field_name, selected_type.getSubtype(i));
            }
        }

        _remaining_rows = _row_reader->getNumberOfRows();

    } catch (std::exception& e) {
        std::string _err_msg = e.what();
        // ignore stop exception
        if (!(_io_ctx && _io_ctx->should_stop && _err_msg == "stop")) {
            return Status::InternalError("Failed to create orc row reader. reason = {}", _err_msg);
        }
    }

    if (!_not_single_slot_filter_conjuncts.empty()) {
        _filter_conjuncts.insert(_filter_conjuncts.end(), _not_single_slot_filter_conjuncts.begin(),
                                 _not_single_slot_filter_conjuncts.end());
        _disable_dict_filter = true;
    }

    if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
        // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
        // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
        for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
            auto& [value, slot_desc] = kv.second;
            auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
            if (iter != _slot_id_to_filter_conjuncts->end()) {
                for (const auto& ctx : iter->second) {
                    _filter_conjuncts.push_back(ctx);
                }
            }
        }
    }
    return Status::OK();
}

Status OrcReader::_fill_partition_columns(
        Block* block, uint64_t rows,
        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
                partition_columns) {
    DataTypeSerDe::FormatOptions _text_formatOptions;
    // todo: maybe do not need to build name to index map every time
    auto name_to_pos_map = block->get_name_to_pos_map();
    for (const auto& kv : partition_columns) {
        auto col_ptr = block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
        const auto& [value, slot_desc] = kv.second;
        auto text_serde = slot_desc->get_data_type_ptr()->get_serde();
        Slice slice(value.data(), value.size());
        uint64_t num_deserialized = 0;
        if (text_serde->deserialize_column_from_fixed_json(*col_ptr, slice, rows, &num_deserialized,
                                                           _text_formatOptions) != Status::OK()) {
            return Status::InternalError("Failed to fill partition column: {}={}",
                                         slot_desc->col_name(), value);
        }
        if (num_deserialized != rows) {
            return Status::InternalError(
                    "Failed to fill partition column: {}={} ."
                    "Number of rows expected to be written : {}, number of rows actually "
                    "written : "
                    "{}",
                    slot_desc->col_name(), value, num_deserialized, rows);
        }
    }
    return Status::OK();
}

Status OrcReader::_fill_missing_columns(
        Block* block, uint64_t rows,
        const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
    // todo: maybe do not need to build name to index map every time
    auto name_to_pos_map = block->get_name_to_pos_map();
    std::set<size_t> positions_to_erase;
    for (const auto& kv : missing_columns) {
        if (!name_to_pos_map.contains(kv.first)) {
            return Status::InternalError("Failed to find missing column: {}, block: {}", kv.first,
                                         block->dump_structure());
        }
        if (kv.second == nullptr) {
            // no default column, fill with null
            auto mutable_column =
                    block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
            auto* nullable_column = static_cast<vectorized::ColumnNullable*>(mutable_column.get());
            nullable_column->insert_many_defaults(rows);
        } else {
            // fill with default value
            const auto& ctx = kv.second;
            auto origin_column_num = block->columns();
            int result_column_id = -1;
            // PT1 => dest primitive type
            RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
            bool is_origin_column = result_column_id < origin_column_num;
            if (!is_origin_column) {
                // call resize because the first column of _src_block_ptr may not be filled by reader,
                // so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
                // has only one row.
                auto result_column_ptr = block->get_by_position(result_column_id).column;
                auto mutable_column = result_column_ptr->assume_mutable();
                mutable_column->resize(rows);
                // result_column_ptr maybe a ColumnConst, convert it to a normal column
                result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
                auto origin_column_type = block->get_by_position(name_to_pos_map[kv.first]).type;
                bool is_nullable = origin_column_type->is_nullable();
                block->replace_by_position(
                        name_to_pos_map[kv.first],
                        is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
                positions_to_erase.insert(result_column_id);
            }
        }
    }
    block->erase(positions_to_erase);
    return Status::OK();
}

Status OrcReader::_fill_row_id_columns(Block* block) {
    if (_row_id_column_iterator_pair.first != nullptr) {
        RETURN_IF_ERROR(
                _row_id_column_iterator_pair.first->seek_to_ordinal(_row_reader->getRowNumber()));
        size_t fill_size = _batch->numElements;

        auto col = block->get_by_position(_row_id_column_iterator_pair.second)
                           .column->assume_mutable();
        RETURN_IF_ERROR(_row_id_column_iterator_pair.first->next_batch(&fill_size, col));
    }

    return Status::OK();
}

void OrcReader::_init_system_properties() {
    if (_scan_range.__isset.file_type) {
        // for compatibility
        _system_properties.system_type = _scan_range.file_type;
    } else {
        _system_properties.system_type = _scan_params.file_type;
    }
    _system_properties.properties = _scan_params.properties;
    _system_properties.hdfs_params = _scan_params.hdfs_params;
    if (_scan_params.__isset.broker_addresses) {
        _system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
                                                   _scan_params.broker_addresses.end());
    }
}

void OrcReader::_init_file_description() {
    _file_description.path = _scan_range.path;
    _file_description.file_size = _scan_range.__isset.file_size ? _scan_range.file_size : -1;
    if (_scan_range.__isset.fs_name) {
        _file_description.fs_name = _scan_range.fs_name;
    }
}

DataTypePtr OrcReader::convert_to_doris_type(const orc::Type* orc_type) {
    // Critical: check for nullptr BEFORE accessing any methods
    if (orc_type == nullptr) {
        LOG(WARNING) << "[OrcReader] ERROR: convert_to_doris_type called with nullptr orc_type! "
                        "Falling back to STRING";
        // Return a safe default type instead of crashing
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true);
    }

    switch (orc_type->getKind()) {
    case orc::TypeKind::BOOLEAN:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_BOOLEAN, true);
    case orc::TypeKind::BYTE:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_TINYINT, true);
    case orc::TypeKind::SHORT:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_SMALLINT, true);
    case orc::TypeKind::INT:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, true);
    case orc::TypeKind::LONG:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_BIGINT, true);
    case orc::TypeKind::FLOAT:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_FLOAT, true);
    case orc::TypeKind::DOUBLE:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DOUBLE, true);
    case orc::TypeKind::STRING:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true);
    case orc::TypeKind::BINARY:
        if (_scan_params.__isset.enable_mapping_varbinary &&
            _scan_params.enable_mapping_varbinary) {
            return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_VARBINARY,
                                                                true);
        } else {
            return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true);
        }
    case orc::TypeKind::TIMESTAMP:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATETIMEV2, true, 0,
                                                            6);
    case orc::TypeKind::DECIMAL:
        return DataTypeFactory::instance().create_data_type(
                PrimitiveType::TYPE_DECIMAL128I, true,
                orc_type->getPrecision() == 0 ? decimal_precision_for_hive11
                                              : cast_set<int>(orc_type->getPrecision()),
                orc_type->getPrecision() == 0 ? decimal_scale_for_hive11
                                              : cast_set<int>(orc_type->getScale()));
    case orc::TypeKind::DATE:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATEV2, true);
    case orc::TypeKind::VARCHAR:
        return DataTypeFactory::instance().create_data_type(
                PrimitiveType::TYPE_VARCHAR, true, 0, 0,
                cast_set<int>(orc_type->getMaximumLength()));
    case orc::TypeKind::CHAR:
        return DataTypeFactory::instance().create_data_type(
                PrimitiveType::TYPE_CHAR, true, 0, 0, cast_set<int>(orc_type->getMaximumLength()));
    case orc::TypeKind::TIMESTAMP_INSTANT:
        return DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_DATETIMEV2, true, 0,
                                                            6);
    case orc::TypeKind::LIST: {
        return make_nullable(
                std::make_shared<DataTypeArray>(convert_to_doris_type(orc_type->getSubtype(0))));
    }
    case orc::TypeKind::MAP: {
        // Handle incomplete MAP type due to column pruning
        // If MAP doesn't have both key and value subtypes, try to find the complete type from file
        if (orc_type->getSubtypeCount() < 2 || orc_type->getSubtype(0) == nullptr ||
            orc_type->getSubtype(1) == nullptr) {
            // Try to find the complete type from _column_id_to_file_type
            uint64_t column_id = orc_type->getColumnId();
            VLOG_DEBUG << "[OrcReader] Detected incomplete MAP type: column_id=" << column_id
                       << ", subtype_count=" << orc_type->getSubtypeCount() << ", subtype(0)="
                       << (orc_type->getSubtypeCount() > 0 && orc_type->getSubtype(0) != nullptr
                                   ? "not null"
                                   : "null")
                       << ", subtype(1)="
                       << (orc_type->getSubtypeCount() > 1 && orc_type->getSubtype(1) != nullptr
                                   ? "not null"
                                   : "null");

            auto it = _column_id_to_file_type.find(column_id);
            if (it != _column_id_to_file_type.end() && it->second != nullptr) {
                const orc::Type* complete_type = it->second;
                VLOG_DEBUG << "[OrcReader] Found complete type in mapping: column_id=" << column_id
                           << ", complete_type_kind=" << static_cast<int>(complete_type->getKind())
                           << ", complete_subtype_count=" << complete_type->getSubtypeCount();

                // Print subtypes information
                for (uint64_t i = 0; i < complete_type->getSubtypeCount(); ++i) {
                    const orc::Type* subtype = complete_type->getSubtype(i);
                    VLOG_DEBUG << "[OrcReader]   complete_type->subtype[" << i << "]: "
                               << (subtype != nullptr
                                           ? ("kind=" +
                                              std::to_string(static_cast<int>(subtype->getKind())) +
                                              ", column_id=" +
                                              std::to_string(subtype->getColumnId()))
                                           : "nullptr");
                }

                if (complete_type->getKind() == orc::TypeKind::MAP &&
                    complete_type->getSubtypeCount() == 2) {
                    VLOG_DEBUG
                            << "[OrcReader] Using complete MAP type from file schema for column_id="
                            << column_id;

                    // Get subtypes with extra validation
                    const orc::Type* key_type = complete_type->getSubtype(0);
                    const orc::Type* value_type = complete_type->getSubtype(1);

                    VLOG_DEBUG << "[OrcReader] About to convert key_type: "
                               << (key_type != nullptr ? "not null" : "NULL");
                    VLOG_DEBUG << "[OrcReader] About to convert value_type: "
                               << (value_type != nullptr ? "not null" : "NULL");

                    // Use the complete type from file - with null checks
                    DataTypePtr key_doris_type = convert_to_doris_type(key_type);
                    VLOG_DEBUG << "[OrcReader] Successfully converted key_type";

                    DataTypePtr value_doris_type = convert_to_doris_type(value_type);
                    VLOG_DEBUG << "[OrcReader] Successfully converted value_type";

                    return make_nullable(
                            std::make_shared<DataTypeMap>(key_doris_type, value_doris_type));
                } else {
                    LOG(WARNING) << "[OrcReader] Warning: Complete type is not a valid MAP or has "
                                    "wrong subtype count";
                }
            } else {
                LOG(WARNING) << "[OrcReader] Warning: Could not find complete type in mapping for "
                                "column_id="
                             << column_id << ", mapping_size=" << _column_id_to_file_type.size();
            }
        }
        return make_nullable(
                std::make_shared<DataTypeMap>(convert_to_doris_type(orc_type->getSubtype(0)),
                                              convert_to_doris_type(orc_type->getSubtype(1))));
    }
    case orc::TypeKind::STRUCT: {
        DataTypes res_data_types;
        std::vector<std::string> names;
        for (int i = 0; i < orc_type->getSubtypeCount(); ++i) {
            res_data_types.push_back(convert_to_doris_type(orc_type->getSubtype(i)));
            names.push_back(get_field_name_lower_case(orc_type, i));
        }
        return make_nullable(std::make_shared<DataTypeStruct>(res_data_types, names));
    }
    default:
        throw Exception(Status::InternalError("Orc type is not supported!"));
        return nullptr;
    }
}

Status OrcReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
                              std::unordered_set<std::string>* missing_cols) {
    const auto& root_type = _reader->getType();
    for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
        name_to_type->emplace(root_type.getFieldName(i),
                              convert_to_doris_type(root_type.getSubtype(i)));
    }
    for (auto& col : _missing_cols) {
        missing_cols->insert(col);
    }
    return Status::OK();
}

// Hive ORC char type will pad trailing spaces.
// https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/impala_char.html
static inline size_t trim_right(const char* s, size_t size) {
    while (size > 0 && s[size - 1] == ' ') {
        size--;
    }
    return size;
}

template <bool is_filter>
Status OrcReader::_decode_string_column(const std::string& col_name,
                                        const MutableColumnPtr& data_column,
                                        const orc::TypeKind& type_kind,
                                        const orc::ColumnVectorBatch* cvb, size_t num_values) {
    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
    const auto* data = dynamic_cast<const orc::EncodedStringVectorBatch*>(cvb);
    if (data == nullptr) {
        return Status::InternalError(
                "Wrong data type for column '{}', expected EncodedStringVectorBatch", col_name);
    }
    if (data->isEncoded) {
        return _decode_string_dict_encoded_column<is_filter>(data_column, type_kind, data,
                                                             num_values);
    } else {
        return _decode_string_non_dict_encoded_column<is_filter>(data_column, type_kind, data,
                                                                 num_values);
    }
}

template <bool is_filter>
Status OrcReader::_decode_string_non_dict_encoded_column(const MutableColumnPtr& data_column,
                                                         const orc::TypeKind& type_kind,
                                                         const orc::EncodedStringVectorBatch* cvb,
                                                         size_t num_values) {
    const static std::string empty_string;
    std::vector<StringRef> string_values;
    string_values.reserve(num_values);
    if (type_kind == orc::TypeKind::CHAR) {
        // Possibly there are some zero padding characters in CHAR type, we have to strip them off.
        if (cvb->hasNulls) {
            for (int i = 0; i < num_values; ++i) {
                if (cvb->notNull[i]) {
                    size_t length = trim_right(cvb->data[i], cvb->length[i]);
                    string_values.emplace_back((length > 0) ? cvb->data[i] : empty_string.data(),
                                               length);
                } else {
                    // Orc doesn't fill null values in new batch, but the former batch has been release.
                    // Other types like int/long/timestamp... are flat types without pointer in them,
                    // so other types do not need to be handled separately like string.
                    string_values.emplace_back(empty_string.data(), 0);
                }
            }
        } else {
            for (int i = 0; i < num_values; ++i) {
                size_t length = trim_right(cvb->data[i], cvb->length[i]);
                string_values.emplace_back((length > 0) ? cvb->data[i] : empty_string.data(),
                                           length);
            }
        }
    } else {
        if (cvb->hasNulls) {
            for (int i = 0; i < num_values; ++i) {
                if (cvb->notNull[i]) {
                    string_values.emplace_back(
                            (cvb->length[i] > 0) ? cvb->data[i] : empty_string.data(),
                            cvb->length[i]);
                } else {
                    string_values.emplace_back(empty_string.data(), 0);
                }
            }
        } else {
            for (int i = 0; i < num_values; ++i) {
                string_values.emplace_back(
                        (cvb->length[i] > 0) ? cvb->data[i] : empty_string.data(), cvb->length[i]);
            }
        }
    }
    if (!string_values.empty()) {
        data_column->insert_many_strings(string_values.data(), num_values);
    }
    return Status::OK();
}

template <bool is_filter>
Status OrcReader::_decode_string_dict_encoded_column(const MutableColumnPtr& data_column,
                                                     const orc::TypeKind& type_kind,
                                                     const orc::EncodedStringVectorBatch* cvb,
                                                     size_t num_values) {
    std::vector<StringRef> string_values;
    size_t max_value_length = 0;
    string_values.reserve(num_values);

    UInt8* __restrict filter_data = nullptr;
    if constexpr (is_filter) {
        filter_data = _filter->data();
    }

    auto process_one = [&]<bool is_char>(int i) {
        if constexpr (is_filter) {
            if (!filter_data[i]) {
                string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
                return;
            }
        }

        char* val_ptr;
        int64_t length;
        cvb->dictionary->getValueByIndex(cvb->index.data()[i], val_ptr, length);

        if constexpr (is_char) {
            length = trim_right(val_ptr, length);
        }

        if (length > max_value_length) {
            max_value_length = length;
        }

        string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW, length);
    };

    if (type_kind == orc::TypeKind::CHAR) {
        if (cvb->hasNulls) {
            for (int i = 0; i < num_values; ++i) {
                if (cvb->notNull[i]) {
                    process_one.template operator()<true>(i);
                } else {
                    string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
                }
            }
        } else {
            for (int i = 0; i < num_values; ++i) {
                process_one.template operator()<true>(i);
            }
        }
    } else {
        if (cvb->hasNulls) {
            for (int i = 0; i < num_values; ++i) {
                if (cvb->notNull[i]) {
                    process_one.template operator()<false>(i);
                } else {
                    string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
                }
            }
        } else {
            for (int i = 0; i < num_values; ++i) {
                process_one.template operator()<false>(i);
            }
        }
    }

    if (!string_values.empty()) {
        data_column->insert_many_strings_overflow(string_values.data(), string_values.size(),
                                                  max_value_length);
    }
    return Status::OK();
}

template <bool is_filter>
Status OrcReader::_decode_int32_column(const std::string& col_name,
                                       const MutableColumnPtr& data_column,
                                       const orc::ColumnVectorBatch* cvb, size_t num_values) {
    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
    if (dynamic_cast<const orc::LongVectorBatch*>(cvb) != nullptr) {
        return _decode_flat_column<TYPE_INT, orc::LongVectorBatch>(col_name, data_column, cvb,
                                                                   num_values);
    } else if (dynamic_cast<const orc::EncodedStringVectorBatch*>(cvb) != nullptr) {
        const auto* data = static_cast<const orc::EncodedStringVectorBatch*>(cvb);
        const auto* cvb_data = data->index.data();
        auto& column_data = static_cast<ColumnInt32&>(*data_column).get_data();
        auto origin_size = column_data.size();
        column_data.resize(origin_size + num_values);
        for (int i = 0; i < num_values; ++i) {
            column_data[origin_size + i] = (Int32)cvb_data[i];
        }
        return Status::OK();
    } else {
        DCHECK(false) << "Bad ColumnVectorBatch type.";
        return Status::InternalError("Bad ColumnVectorBatch type.");
    }
}

Status OrcReader::_fill_doris_array_offsets(const std::string& col_name,
                                            ColumnArray::Offsets64& doris_offsets,
                                            const orc::DataBuffer<int64_t>& orc_offsets,
                                            size_t num_values, size_t* element_size) {
    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
    if (num_values > 0) {
        // The const variable uses a non-const method from a third-party dependency
        // without modification, so const_cast can be used.
        if (const_cast<orc::DataBuffer<int64_t>&>(orc_offsets).size() < num_values + 1) {
            return Status::InternalError("Wrong array offsets in orc file for column '{}'",
                                         col_name);
        }
        auto prev_offset = doris_offsets.back();
        auto base_offset = orc_offsets[0];
        for (int i = 1; i < num_values + 1; ++i) {
            doris_offsets.emplace_back(prev_offset + orc_offsets[i] - base_offset);
        }
        *element_size = orc_offsets[num_values] - base_offset;
    } else {
        *element_size = 0;
    }
    return Status::OK();
}

template <bool is_filter>
Status OrcReader::_fill_doris_data_column(const std::string& col_name,
                                          MutableColumnPtr& data_column,
                                          const DataTypePtr& data_type,
                                          std::shared_ptr<TableSchemaChangeHelper::Node> root_node,
                                          const orc::Type* orc_column_type,
                                          const orc::ColumnVectorBatch* cvb, size_t num_values) {
    auto logical_type = data_type->get_primitive_type();

    switch (logical_type) {
#define DISPATCH(FlatType, CppType, OrcColumnType) \
    case FlatType:                                 \
        return _decode_flat_column<FlatType, OrcColumnType>(col_name, data_column, cvb, num_values);
        FOR_FLAT_ORC_COLUMNS(DISPATCH)
#undef DISPATCH
    case PrimitiveType::TYPE_INT:
        return _decode_int32_column<is_filter>(col_name, data_column, cvb, num_values);
    case PrimitiveType::TYPE_DECIMAL32:
        return _decode_decimal_column<TYPE_DECIMAL32, is_filter>(col_name, data_column, data_type,
                                                                 cvb, num_values);
    case PrimitiveType::TYPE_DECIMAL64:
        return _decode_decimal_column<TYPE_DECIMAL64, is_filter>(col_name, data_column, data_type,
                                                                 cvb, num_values);
    case PrimitiveType::TYPE_DECIMALV2:
        return _decode_decimal_column<TYPE_DECIMALV2, is_filter>(col_name, data_column, data_type,
                                                                 cvb, num_values);
    case PrimitiveType::TYPE_DECIMAL128I:
        return _decode_decimal_column<TYPE_DECIMAL128I, is_filter>(col_name, data_column, data_type,
                                                                   cvb, num_values);
    case PrimitiveType::TYPE_DATEV2:
        return _decode_time_column<DateV2Value<DateV2ValueType>, TYPE_DATEV2, orc::LongVectorBatch,
                                   is_filter>(col_name, data_column, cvb, num_values);
    case PrimitiveType::TYPE_DATETIMEV2:
        return _decode_time_column<DateV2Value<DateTimeV2ValueType>, TYPE_DATETIMEV2,
                                   orc::TimestampVectorBatch, is_filter>(col_name, data_column, cvb,
                                                                         num_values);
    case PrimitiveType::TYPE_STRING:
    case PrimitiveType::TYPE_VARCHAR:
    case PrimitiveType::TYPE_CHAR:
        return _decode_string_column<is_filter>(col_name, data_column, orc_column_type->getKind(),
                                                cvb, num_values);
    case PrimitiveType::TYPE_VARBINARY:
        // case BINARY:    binary type still use StringVectorBatch, so here we just call _decode_string_column
        // return encoded ? std::make_unique<EncodedStringVectorBatch>(capacity, memoryPool)
        //                : std::make_unique<StringVectorBatch>(capacity, memoryPool);
        return _decode_string_column<is_filter>(col_name, data_column, orc_column_type->getKind(),
                                                cvb, num_values);
    case PrimitiveType::TYPE_ARRAY: {
        if (orc_column_type->getKind() != orc::TypeKind::LIST) {
            return Status::InternalError(
                    "Wrong data type for column '{}', expected list, actual {}", col_name,
                    orc_column_type->getKind());
        }
        const auto* orc_list = dynamic_cast<const orc::ListVectorBatch*>(cvb);
        auto& doris_offsets = static_cast<ColumnArray&>(*data_column).get_offsets();
        const auto& orc_offsets = orc_list->offsets;
        size_t element_size = 0;
        RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_offsets, orc_offsets, num_values,
                                                  &element_size));
        const DataTypePtr& nested_type =
                reinterpret_cast<const DataTypeArray*>(remove_nullable(data_type).get())
                        ->get_nested_type();
        const orc::Type* nested_orc_type = orc_column_type->getSubtype(0);
        std::string element_name = col_name + ".element";
        return _orc_column_to_doris_column<false>(
                element_name, static_cast<ColumnArray&>(*data_column).get_data_ptr(), nested_type,
                root_node->get_element_node(), nested_orc_type, orc_list->elements.get(),
                element_size);
    }
    case PrimitiveType::TYPE_MAP: {
        if (orc_column_type->getKind() != orc::TypeKind::MAP) {
            return Status::InternalError("Wrong data type for column '{}', expected map, actual {}",
                                         col_name, orc_column_type->getKind());
        }
        const auto* orc_map = dynamic_cast<const orc::MapVectorBatch*>(cvb);
        auto& doris_map = static_cast<ColumnMap&>(*data_column);
        size_t element_size = 0;
        RETURN_IF_ERROR(_fill_doris_array_offsets(col_name, doris_map.get_offsets(),
                                                  orc_map->offsets, num_values, &element_size));
        const DataTypePtr& doris_key_type =
                reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
                        ->get_key_type();
        const DataTypePtr& doris_value_type =
                reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())
                        ->get_value_type();

        // Get ORC key and value types with null checks
        const orc::Type* orc_key_type = orc_column_type->getSubtype(0);
        const orc::Type* orc_value_type = orc_column_type->getSubtype(1);

        VLOG_DEBUG << "[OrcReader] MAP column '" << col_name
                   << "': orc_key_type=" << (orc_key_type != nullptr ? "not null" : "NULL")
                   << ", orc_value_type=" << (orc_value_type != nullptr ? "not null" : "NULL")
                   << ", element_size=" << element_size;

        // Handle incomplete MAP type - if key or value type is nullptr, try to recover from mapping
        bool key_is_missing = (orc_key_type == nullptr);
        bool value_is_missing = (orc_value_type == nullptr);

        if (key_is_missing || value_is_missing) {
            VLOG_DEBUG << "[OrcReader] Detected incomplete MAP subtypes for column '" << col_name
                       << "', attempting to recover from mapping...";

            uint64_t column_id = orc_column_type->getColumnId();
            auto it = _column_id_to_file_type.find(column_id);
            if (it != _column_id_to_file_type.end() && it->second != nullptr) {
                const orc::Type* complete_map_type = it->second;
                if (complete_map_type->getKind() == orc::TypeKind::MAP &&
                    complete_map_type->getSubtypeCount() == 2) {
                    if (key_is_missing) {
                        orc_key_type = complete_map_type->getSubtype(0);
                        if (orc_key_type != nullptr) {
                            // key_is_missing = false;
                            VLOG_DEBUG << "[OrcReader] Recovered key type from mapping for column '"
                                       << col_name << "'";
                        }
                    }
                    if (value_is_missing) {
                        orc_value_type = complete_map_type->getSubtype(1);
                        if (orc_value_type != nullptr) {
                            // value_is_missing = false;
                            VLOG_DEBUG
                                    << "[OrcReader] Recovered value type from mapping for column '"
                                    << col_name << "'";
                        }
                    }
                }
            }
        }

        ColumnPtr& doris_key_column = doris_map.get_keys_ptr();
        ColumnPtr& doris_value_column = doris_map.get_values_ptr();
        std::string key_col_name = col_name + ".key";
        std::string value_col_name = col_name + ".value";

        // Handle key column: if still missing, fill with default values
        if (key_is_missing) {
            // Fill key column with default values (nulls or empty values)
            auto mutable_key_column = doris_key_column->assume_mutable();
            if (mutable_key_column->is_nullable()) {
                auto* nullable_column = static_cast<ColumnNullable*>(mutable_key_column.get());
                nullable_column->insert_many_defaults(element_size);
            } else {
                mutable_key_column->insert_many_defaults(element_size);
            }
        } else {
            // Normal processing: convert ORC column to Doris column
            RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
                    key_col_name, doris_key_column, doris_key_type, root_node->get_key_node(),
                    orc_key_type, orc_map->keys.get(), element_size));
        }

        // Handle value column: if still missing, fill with default values
        if (value_is_missing) {
            // Fill value column with default values (nulls or empty values)
            auto mutable_value_column = doris_value_column->assume_mutable();
            if (mutable_value_column->is_nullable()) {
                auto* nullable_column = static_cast<ColumnNullable*>(mutable_value_column.get());
                nullable_column->insert_many_defaults(element_size);
            } else {
                mutable_value_column->insert_many_defaults(element_size);
            }
        } else {
            // Normal processing: convert ORC column to Doris column
            RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
                    value_col_name, doris_value_column, doris_value_type,
                    root_node->get_value_node(), orc_value_type, orc_map->elements.get(),
                    element_size));
        }
        return Status::OK();
    }
    case PrimitiveType::TYPE_STRUCT: {
        if (orc_column_type->getKind() != orc::TypeKind::STRUCT) {
            return Status::InternalError(
                    "Wrong data type for column '{}', expected struct, actual {}", col_name,
                    orc_column_type->getKind());
        }
        const auto* orc_struct = dynamic_cast<const orc::StructVectorBatch*>(cvb);
        auto& doris_struct = static_cast<ColumnStruct&>(*data_column);
        std::map<int, int> read_fields;
        std::set<int> missing_fields;
        const auto* doris_struct_type =
                assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get());

        // Build ORC field name to index map for faster lookup
        std::unordered_map<std::string, int> orc_field_name_to_idx;
        for (int j = 0; j < orc_column_type->getSubtypeCount(); ++j) {
            std::string field_name = orc_column_type->getFieldName(j);
            std::transform(field_name.begin(), field_name.end(), field_name.begin(), ::tolower);
            orc_field_name_to_idx[field_name] = j;
        }

        for (int i = 0; i < doris_struct.tuple_size(); ++i) {
            const auto& table_column_name = doris_struct_type->get_name_by_position(i);
            if (!root_node->children_column_exists(table_column_name)) {
                missing_fields.insert(i);
                continue;
            }
            const auto& file_column_name = root_node->children_file_column_name(table_column_name);
            std::string file_column_name_lower = file_column_name;
            std::transform(file_column_name_lower.begin(), file_column_name_lower.end(),
                           file_column_name_lower.begin(), ::tolower);

            auto it = orc_field_name_to_idx.find(file_column_name_lower);
            if (it != orc_field_name_to_idx.end()) {
                read_fields[i] = it->second;
                VLOG_DEBUG << "[OrcReader] Found field mapping: doris_field[" << i
                           << "] -> orc_field[" << it->second
                           << "], table_column: " << table_column_name
                           << ", file_column: " << file_column_name_lower;
            } else {
                missing_fields.insert(i);
                VLOG_DEBUG << "[OrcReader] Missing field: doris_field[" << i
                           << "], table_column: " << table_column_name
                           << ", file_column: " << file_column_name_lower
                           << " (not found in ORC file)";
            }
        }

        for (int missing_field : missing_fields) {
            ColumnPtr& doris_field = doris_struct.get_column_ptr(missing_field);
            if (!doris_field->is_nullable()) {
                return Status::InternalError(
                        "Child field of '{}' is not nullable, but is missing in orc file",
                        col_name);
            }
            reinterpret_cast<ColumnNullable*>(doris_field->assume_mutable().get())
                    ->insert_many_defaults(num_values);
        }

        for (auto read_field : read_fields) {
            orc::ColumnVectorBatch* orc_field = orc_struct->fields[read_field.second];
            const orc::Type* orc_type = orc_column_type->getSubtype(read_field.second);
            std::string field_name =
                    col_name + "." + orc_column_type->getFieldName(read_field.second);
            ColumnPtr& doris_field = doris_struct.get_column_ptr(read_field.first);
            const DataTypePtr& doris_type = doris_struct_type->get_element(read_field.first);
            RETURN_IF_ERROR(_orc_column_to_doris_column<is_filter>(
                    field_name, doris_field, doris_type,
                    root_node->get_children_node(
                            doris_struct_type->get_name_by_position(read_field.first)),
                    orc_type, orc_field, num_values));
        }
        return Status::OK();
    }
    default:
        break;
    }
    return Status::InternalError("Unsupported type for column '{}'", col_name);
}

template <bool is_filter>
Status OrcReader::_orc_column_to_doris_column(
        const std::string& col_name, ColumnPtr& doris_column, const DataTypePtr& data_type,
        std::shared_ptr<TableSchemaChangeHelper::Node> root_node, const orc::Type* orc_column_type,
        const orc::ColumnVectorBatch* cvb, size_t num_values) {
    DataTypePtr resolved_type;
    ColumnPtr resolved_column;
    MutableColumnPtr data_column;
    if (orc_column_type != nullptr) {
        auto src_type = convert_to_doris_type(orc_column_type);
        bool is_dict_filter_col = false;
        for (const std::pair<std::string, int>& dict_col : _dict_filter_cols) {
            if (col_name == dict_col.first) {
                src_type =
                        DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_INT, true);
                is_dict_filter_col = true;
                break;
            }
        }
        // If the column can be dictionary filtered, there will be two types.
        // It may be plain or a dictionary, because the same field in different stripes may have different types.
        // Here we use the $dict_ prefix to represent the dictionary type converter.
        auto converter_key = !is_dict_filter_col ? col_name : fmt::format("$dict_{}", col_name);

        if (!_converters.contains(converter_key)) {
            std::unique_ptr<converter::ColumnTypeConverter> converter =
                    converter::ColumnTypeConverter::get_converter(src_type, data_type,
                                                                  converter::FileFormat::ORC);
            if (!converter->support()) {
                return Status::InternalError(
                        "The column type of '{}' has changed and is not supported: ", col_name,
                        converter->get_error_msg());
            }
            // reuse the cached converter
            _converters[converter_key] = std::move(converter);
        }
        converter::ColumnTypeConverter* converter = _converters[converter_key].get();
        resolved_column = converter->get_column(src_type, doris_column, data_type);
        resolved_type = converter->get_type();

        if (resolved_column->is_nullable()) {
            SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
            auto* nullable_column =
                    reinterpret_cast<ColumnNullable*>(resolved_column->assume_mutable().get());
            data_column = nullable_column->get_nested_column_ptr();

            NullMap& map_data_column = nullable_column->get_null_map_data();
            auto origin_size = map_data_column.size();
            map_data_column.resize(origin_size + num_values);
            if (cvb->hasNulls) {
                const auto* cvb_nulls = cvb->notNull.data();
                for (int i = 0; i < num_values; ++i) {
                    map_data_column[origin_size + i] = !cvb_nulls[i];
                }
            } else {
                memset(map_data_column.data() + origin_size, 0, num_values);
            }
        } else {
            if (cvb->hasNulls) {
                return Status::InternalError("Not nullable column {} has null values in orc file",
                                             col_name);
            }
            data_column = resolved_column->assume_mutable();
        }

        RETURN_IF_ERROR(_fill_doris_data_column<is_filter>(
                col_name, data_column, remove_nullable(resolved_type), root_node, orc_column_type,
                cvb, num_values));
        // resolve schema change
        auto converted_column = doris_column->assume_mutable();
        return converter->convert(resolved_column, converted_column);
    } else {
        auto mutable_column = doris_column->assume_mutable();
        if (mutable_column->is_nullable()) {
            auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
            nullable_column->insert_many_defaults(num_values);
        } else {
            mutable_column->insert_many_defaults(num_values);
        }
    }

    return Status::OK();
}

std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int pos) {
    std::string name = orc_type->getFieldName(pos);
    transform(name.begin(), name.end(), name.begin(), ::tolower);
    return name;
}

Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
    RETURN_IF_ERROR(_get_next_block_impl(block, read_rows, eof));
    if (*eof) {
        COUNTER_UPDATE(_orc_profile.selected_row_group_count,
                       _reader_metrics.SelectedRowGroupCount);
        COUNTER_UPDATE(_orc_profile.evaluated_row_group_count,
                       _reader_metrics.EvaluatedRowGroupCount);
        if (_io_ctx) {
            _io_ctx->file_reader_stats->read_rows += _reader_metrics.ReadRowCount;
        }
    }
    if (_orc_filter) {
        RETURN_IF_ERROR(_orc_filter->get_status());
    }
    if (_string_dict_filter) {
        RETURN_IF_ERROR(_string_dict_filter->get_status());
    }
    return Status::OK();
}

Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eof) {
    if (_io_ctx && _io_ctx->should_stop) {
        *eof = true;
        *read_rows = 0;
        return Status::OK();
    }
    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
        auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);

        set_remaining_rows(get_remaining_rows() - rows);
        auto mutate_columns = block->mutate_columns();
        for (auto& col : mutate_columns) {
            col->resize(rows);
        }
        block->set_columns(std::move(mutate_columns));
        *read_rows = rows;
        if (get_remaining_rows() == 0) {
            *eof = true;
        }
        return Status::OK();
    }

    if (!_seek_to_read_one_line()) {
        *eof = true;
        return Status::OK();
    }

    if (_lazy_read_ctx.can_lazy_read) {
        std::vector<uint32_t> columns_to_filter;
        int column_to_keep = block->columns();
        columns_to_filter.resize(column_to_keep);
        for (uint32_t i = 0; i < column_to_keep; ++i) {
            columns_to_filter[i] = i;
        }
        uint64_t rr;
        SCOPED_RAW_TIMER(&_statistics.column_read_time);
        {
            SCOPED_RAW_TIMER(&_statistics.get_batch_time);
            // reset decimal_scale_params_index;
            _decimal_scale_params_index = 0;
            try {
                rr = _row_reader->nextBatch(*_batch, block);
                if (rr == 0 || _batch->numElements == 0) {
                    *eof = true;
                    *read_rows = 0;
                    return Status::OK();
                }
            } catch (std::exception& e) {
                std::string _err_msg = e.what();
                if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
                    block->clear_column_data();
                    *eof = true;
                    *read_rows = 0;
                    return Status::OK();
                }
                return Status::InternalError("Orc row reader nextBatch failed. reason = {}",
                                             _err_msg);
            }
        }

        std::vector<orc::ColumnVectorBatch*> batch_vec;
        _fill_batch_vec(batch_vec, _batch.get(), 0);

        // todo: maybe do not need to build name to index map every time
        auto name_to_pos_map = block->get_name_to_pos_map();
        for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
            auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
            auto& column_ptr = column_with_type_and_name.column;
            auto& column_type = column_with_type_and_name.type;
            auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
            auto orc_col_idx = _colname_to_idx.find(file_column_name);
            if (orc_col_idx == _colname_to_idx.end()) {
                return Status::InternalError("Wrong read column '{}' in orc file", col_name);
            }
            RETURN_IF_ERROR(_orc_column_to_doris_column<true>(
                    col_name, column_ptr, column_type,
                    _table_info_node_ptr->get_children_node(col_name), _type_map[file_column_name],
                    batch_vec[orc_col_idx->second], _batch->numElements));
        }

        RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
                                                _lazy_read_ctx.partition_columns));
        RETURN_IF_ERROR(
                _fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns));

        RETURN_IF_ERROR(_fill_row_id_columns(block));

        if (block->rows() == 0) {
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
            *eof = true;
            *read_rows = 0;
            return Status::OK();
        }
        {
            SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
            _execute_filter_position_delete_rowids(*_filter);
            {
                SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
                RETURN_IF_CATCH_EXCEPTION(
                        Block::filter_block_internal(block, columns_to_filter, *_filter));
            }
            Block::erase_useless_column(block, column_to_keep);
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
            *read_rows = block->rows();
        }
    } else {
        uint64_t rr;
        SCOPED_RAW_TIMER(&_statistics.column_read_time);
        {
            SCOPED_RAW_TIMER(&_statistics.get_batch_time);
            // reset decimal_scale_params_index;
            _decimal_scale_params_index = 0;
            try {
                rr = _row_reader->nextBatch(*_batch, block);
                if (rr == 0 || _batch->numElements == 0) {
                    *eof = true;
                    *read_rows = 0;
                    return Status::OK();
                }
            } catch (std::exception& e) {
                std::string _err_msg = e.what();
                if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
                    block->clear_column_data();
                    *eof = true;
                    *read_rows = 0;
                    return Status::OK();
                }
                return Status::InternalError("Orc row reader nextBatch failed. reason = {}",
                                             _err_msg);
            }
        }

        // todo: maybe do not need to build name to index map every time
        auto name_to_pos_map = block->get_name_to_pos_map();
        if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
            for (auto& dict_filter_cols : _dict_filter_cols) {
                MutableColumnPtr dict_col_ptr = ColumnInt32::create();
                if (!name_to_pos_map.contains(dict_filter_cols.first)) {
                    return Status::InternalError(
                            "Failed to find dict filter column '{}' in block {}",
                            dict_filter_cols.first, block->dump_structure());
                }
                auto pos = name_to_pos_map[dict_filter_cols.first];
                auto& column_with_type_and_name = block->get_by_position(pos);
                auto& column_type = column_with_type_and_name.type;
                if (column_type->is_nullable()) {
                    block->get_by_position(pos).type =
                            std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
                    block->replace_by_position(
                            pos,
                            ColumnNullable::create(std::move(dict_col_ptr),
                                                   ColumnUInt8::create(dict_col_ptr->size(), 0)));
                } else {
                    block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
                    block->replace_by_position(pos, std::move(dict_col_ptr));
                }
            }
            _dict_cols_has_converted = true;
        }

        std::vector<orc::ColumnVectorBatch*> batch_vec;
        _fill_batch_vec(batch_vec, _batch.get(), 0);

        for (auto& col_name : _lazy_read_ctx.all_read_columns) {
            auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[col_name]);
            auto& column_ptr = column_with_type_and_name.column;
            auto& column_type = column_with_type_and_name.type;
            auto file_column_name = _table_info_node_ptr->children_file_column_name(col_name);
            auto orc_col_idx = _colname_to_idx.find(file_column_name);
            if (orc_col_idx == _colname_to_idx.end()) {
                return Status::InternalError("Wrong read column '{}' in orc file", col_name);
            }
            RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
                    col_name, column_ptr, column_type,
                    _table_info_node_ptr->get_children_node(col_name), _type_map[file_column_name],
                    batch_vec[orc_col_idx->second], _batch->numElements));
        }

        RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
                                                _lazy_read_ctx.partition_columns));
        RETURN_IF_ERROR(
                _fill_missing_columns(block, _batch->numElements, _lazy_read_ctx.missing_columns));

        RETURN_IF_ERROR(_fill_row_id_columns(block));

        if (block->rows() == 0) {
            RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
            *eof = true;
            *read_rows = 0;
            return Status::OK();
        }

        {
            SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
            _build_delete_row_filter(block, _batch->numElements);

            std::vector<uint32_t> columns_to_filter;
            int column_to_keep = block->columns();
            columns_to_filter.resize(column_to_keep);
            for (uint32_t i = 0; i < column_to_keep; ++i) {
                columns_to_filter[i] = i;
            }
            if (!_lazy_read_ctx.conjuncts.empty()) {
                VExprContextSPtrs filter_conjuncts;
                filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
                                        _filter_conjuncts.end());
                for (auto& conjunct : _dict_filter_conjuncts) {
                    filter_conjuncts.emplace_back(conjunct);
                }
                for (auto& conjunct : _non_dict_filter_conjuncts) {
                    filter_conjuncts.emplace_back(conjunct);
                }
                std::vector<IColumn::Filter*> filters;
                if (_delete_rows_filter_ptr) {
                    filters.push_back(_delete_rows_filter_ptr.get());
                }
                IColumn::Filter result_filter(block->rows(), 1);
                bool can_filter_all = false;
                RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
                        filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
                if (can_filter_all) {
                    for (auto& col : columns_to_filter) {
                        std::move(*block->get_by_position(col).column).assume_mutable()->clear();
                    }
                    Block::erase_useless_column(block, column_to_keep);
                    return _convert_dict_cols_to_string_cols(block, &batch_vec);
                }
                _execute_filter_position_delete_rowids(result_filter);
                RETURN_IF_CATCH_EXCEPTION(
                        Block::filter_block_internal(block, columns_to_filter, result_filter));
                Block::erase_useless_column(block, column_to_keep);
            } else {
                if (_delete_rows_filter_ptr) {
                    _execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
                    RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
                            block, columns_to_filter, (*_delete_rows_filter_ptr)));
                } else if (_position_delete_ordered_rowids != nullptr) {
                    std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
                    _execute_filter_position_delete_rowids(*filter);
                    RETURN_IF_CATCH_EXCEPTION(
                            Block::filter_block_internal(block, columns_to_filter, (*filter)));
                }
                Block::erase_useless_column(block, column_to_keep);
            }
        }
        RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
        *read_rows = block->rows();
    }
    return Status::OK();
}

void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
                                orc::ColumnVectorBatch* batch, int idx) {
    if (_is_acid) {
        for (auto* field : dynamic_cast<const orc::StructVectorBatch*>(batch)->fields) {
            if (dynamic_cast<const orc::StructVectorBatch*>(field) != nullptr) {
                for (auto* row_field : dynamic_cast<const orc::StructVectorBatch*>(field)->fields) {
                    result.push_back(row_field);
                }
            } else {
                result.push_back(field);
            }
        }
    } else {
        for (auto* field : dynamic_cast<const orc::StructVectorBatch*>(batch)->fields) {
            result.push_back(field);
        }
    }
}

void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
    // transactional hive orc delete row
    if (_delete_rows != nullptr) {
        _delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
        auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
        // todo: maybe do not need to build name to index map every time
        auto name_to_pos_map = block->get_name_to_pos_map();
        const auto& original_transaction_column = assert_cast<const ColumnInt64&>(*remove_nullable(
                block->get_by_position(
                             name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
                        .column));
        const auto& bucket_id_column = assert_cast<const ColumnInt32&>(*remove_nullable(
                block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
                        .column));
        const auto& row_id_column = assert_cast<const ColumnInt64&>(*remove_nullable(
                block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
                        .column));
        for (int i = 0; i < rows; ++i) {
            auto original_transaction = original_transaction_column.get_int(i);
            auto bucket_id = bucket_id_column.get_int(i);
            auto row_id = row_id_column.get_int(i);

            TransactionalHiveReader::AcidRowID transactional_row_id = {
                    .original_transaction = original_transaction,
                    .bucket = bucket_id,
                    .row_id = row_id};
            if (_delete_rows->contains(transactional_row_id)) {
                _pos_delete_filter_data[i] = 0;
            }
        }
    }
}

Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
    SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
    auto* block = (Block*)arg;
    size_t origin_column_num = block->columns();

    if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
        // todo: maybe do not need to build name to index map every time
        auto name_to_pos_map = block->get_name_to_pos_map();
        for (auto& dict_filter_cols : _dict_filter_cols) {
            if (!name_to_pos_map.contains(dict_filter_cols.first)) {
                return Status::InternalError("Failed to find dict filter column '{}' in block {}",
                                             dict_filter_cols.first, block->dump_structure());
            }
            MutableColumnPtr dict_col_ptr = ColumnInt32::create();
            auto pos = name_to_pos_map[dict_filter_cols.first];
            auto& column_with_type_and_name = block->get_by_position(pos);
            auto& column_type = column_with_type_and_name.type;
            if (column_type->is_nullable()) {
                block->get_by_position(pos).type =
                        std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
                block->replace_by_position(
                        pos, ColumnNullable::create(std::move(dict_col_ptr),
                                                    ColumnUInt8::create(dict_col_ptr->size(), 0)));
            } else {
                block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
                block->replace_by_position(pos, std::move(dict_col_ptr));
            }
        }
        _dict_cols_has_converted = true;
    }
    std::vector<orc::ColumnVectorBatch*> batch_vec;
    _fill_batch_vec(batch_vec, &data, 0);
    std::vector<std::string> table_col_names;
    table_col_names.insert(table_col_names.end(), _lazy_read_ctx.predicate_columns.first.begin(),
                           _lazy_read_ctx.predicate_columns.first.end());
    if (_is_acid) {
        table_col_names.insert(table_col_names.end(),
                               TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
                               TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
    }
    // todo: maybe do not need to build name to index map every time
    auto name_to_pos_map = block->get_name_to_pos_map();
    for (auto& table_col_name : table_col_names) {
        auto& column_with_type_and_name = block->get_by_position(name_to_pos_map[table_col_name]);
        auto& column_ptr = column_with_type_and_name.column;
        auto& column_type = column_with_type_and_name.type;
        auto file_column_name = _table_info_node_ptr->children_file_column_name(table_col_name);
        auto orc_col_idx = _colname_to_idx.find(file_column_name);
        if (orc_col_idx == _colname_to_idx.end()) {
            return Status::InternalError("Wrong read column '{}' in orc file", table_col_name);
        }
        RETURN_IF_ERROR(_orc_column_to_doris_column<false>(
                table_col_name, column_ptr, column_type,
                _table_info_node_ptr->get_children_node(table_col_name),
                _type_map[file_column_name], batch_vec[orc_col_idx->second], data.numElements));
    }
    RETURN_IF_ERROR(
            _fill_partition_columns(block, size, _lazy_read_ctx.predicate_partition_columns));
    RETURN_IF_ERROR(_fill_missing_columns(block, size, _lazy_read_ctx.predicate_missing_columns));
    if (_lazy_read_ctx.resize_first_column) {
        // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
        // The following process may be tricky and time-consuming, but we have no other way.
        block->get_by_position(0).column->assume_mutable()->resize(size);
    }

    // transactional hive orc delete row
    _build_delete_row_filter(block, size);

    _filter = std::make_unique<IColumn::Filter>(size, 1);
    auto* __restrict result_filter_data = _filter->data();
    bool can_filter_all = false;
    VExprContextSPtrs filter_conjuncts;
    filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
                            _filter_conjuncts.end());
    for (auto& conjunct : _dict_filter_conjuncts) {
        filter_conjuncts.emplace_back(conjunct);
    }
    for (auto& conjunct : _non_dict_filter_conjuncts) {
        filter_conjuncts.emplace_back(conjunct);
    }
    std::vector<IColumn::Filter*> filters;
    if (_delete_rows_filter_ptr) {
        filters.push_back(_delete_rows_filter_ptr.get());
    }
    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
            filter_conjuncts, &filters, block, _filter.get(), &can_filter_all));

    if (_lazy_read_ctx.resize_first_column) {
        // We have to clean the first column to insert right data.
        block->get_by_position(0).column->assume_mutable()->clear();
    }

    if (can_filter_all) {
        for (auto& col : table_col_names) {
            // clean block to read predicate columns and acid columns
            block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
        }
        for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
            block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
        }
        for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
            block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
        }
        Block::erase_useless_column(block, origin_column_num);
        RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
    }

    uint16_t new_size = 0;
    for (uint16_t i = 0; i < size; i++) {
        sel[new_size] = i;
        new_size += result_filter_data[i] ? 1 : 0;
    }
    _statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
    data.numElements = new_size;
    return Status::OK();
}

Status OrcReader::fill_dict_filter_column_names(
        std::unique_ptr<orc::StripeInformation> current_strip_information,
        std::list<std::string>& column_names) {
    // Check if single slot can be filtered by dict.
    if (!_slot_id_to_filter_conjuncts) {
        return Status::OK();
    }
    _obj_pool->clear();
    _dict_filter_cols.clear();
    _dict_filter_conjuncts.clear();
    _non_dict_filter_conjuncts.clear();

    const std::list<std::string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
    const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
    int i = 0;
    for (const auto& predicate_col_name : predicate_col_names) {
        int slot_id = predicate_col_slot_ids[i];
        if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
            _dict_filter_cols.emplace_back(predicate_col_name, slot_id);
            column_names.emplace_back(
                    _table_info_node_ptr->children_file_column_name(predicate_col_name));
        } else {
            if (_slot_id_to_filter_conjuncts->find(slot_id) !=
                _slot_id_to_filter_conjuncts->end()) {
                for (const auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
                    _non_dict_filter_conjuncts.push_back(ctx);
                }
            }
        }
        ++i;
    }
    return Status::OK();
}

bool OrcReader::_can_filter_by_dict(int slot_id) {
    SlotDescriptor* slot = nullptr;
    const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
    for (auto* each : slots) {
        if (each->id() == slot_id) {
            slot = each;
            break;
        }
    }
    if (slot == nullptr) {
        return false;
    }
    if (!is_string_type(slot->type()->get_primitive_type()) &&
        !is_var_len_object(slot->type()->get_primitive_type())) {
        return false;
    }

    if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
        return false;
    }

    // TODO: The current implementation of dictionary filtering does not take into account
    //  the implementation of NULL values because the dictionary itself does not contain
    //  NULL value encoding. As a result, many NULL-related functions or expressions
    //  cannot work properly, such as is null, is not null, coalesce, etc.
    //  Here we check if the predicate expr is IN or BINARY_PRED.
    //  Implementation of NULL value dictionary filtering will be carried out later.
    return std::ranges::all_of(_slot_id_to_filter_conjuncts->at(slot_id), [&](const auto& ctx) {
        return (ctx->root()->node_type() == TExprNodeType::IN_PRED ||
                ctx->root()->node_type() == TExprNodeType::BINARY_PRED) &&
               ctx->root()->children()[0]->node_type() == TExprNodeType::SLOT_REF;
    });
}

Status OrcReader::on_string_dicts_loaded(
        std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
        bool* is_stripe_filtered) {
    SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
    *is_stripe_filtered = false;
    for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
        std::string& dict_filter_col_name = it->first;
        int slot_id = it->second;

        // Can not dict filter col find because stripe is not dict encoded, then remove it.
        VExprContextSPtrs ctxs;
        auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
        if (iter != _slot_id_to_filter_conjuncts->end()) {
            for (const auto& ctx : iter->second) {
                ctxs.push_back(ctx);
            }
        } else {
            std::stringstream msg;
            msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
            return Status::NotFound(msg.str());
        }
        auto file_column_name_to_dict_map_iter = file_column_name_to_dict_map.find(
                _table_info_node_ptr->children_file_column_name(dict_filter_col_name));
        if (file_column_name_to_dict_map_iter == file_column_name_to_dict_map.end()) {
            it = _dict_filter_cols.erase(it);
            for (auto& ctx : ctxs) {
                _non_dict_filter_conjuncts.emplace_back(ctx);
            }
            continue;
        }

        // 1. Get dictionary values to a string column.
        MutableColumnPtr dict_value_column = ColumnString::create();
        orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second;

        std::vector<StringRef> dict_values;
        size_t max_value_length = 0;
        uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1;
        if (dictionaryCount == 0) {
            it = _dict_filter_cols.erase(it);
            for (auto& ctx : ctxs) {
                _non_dict_filter_conjuncts.emplace_back(ctx);
            }
            continue;
        }
        dict_values.reserve(dictionaryCount);
        for (int i = 0; i < dictionaryCount; ++i) {
            char* val_ptr;
            int64_t length;
            dict->getValueByIndex(i, val_ptr, length);
            StringRef dict_value((length > 0) ? val_ptr : "", length);
            if (length > max_value_length) {
                max_value_length = length;
            }
            dict_values.emplace_back(dict_value);
        }
        dict_value_column->insert_many_strings_overflow(dict_values.data(), dict_values.size(),
                                                        max_value_length);
        size_t dict_value_column_size = dict_value_column->size();
        // 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
        // 2.1 Build a temp block from the dict string column to match the conjuncts executing.
        Block temp_block;
        int dict_pos = -1;
        int index = 0;
        for (const auto slot_desc : _tuple_descriptor->slots()) {
            if (slot_desc->id() == slot_id) {
                auto data_type = slot_desc->get_data_type_ptr();
                if (data_type->is_nullable()) {
                    temp_block.insert(
                            {ColumnNullable::create(
                                     std::move(
                                             dict_value_column), // NOLINT(bugprone-use-after-move)
                                     ColumnUInt8::create(dict_value_column_size, 0)),
                             std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
                             ""});
                } else {
                    temp_block.insert(
                            {std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
                }
                dict_pos = index;

            } else {
                temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
                                                        slot_desc->get_data_type_ptr(),
                                                        slot_desc->col_name()));
            }
            ++index;
        }

        // 2.2 Execute conjuncts.
        if (dict_pos != 0) {
            // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
            // The following process may be tricky and time-consuming, but we have no other way.
            temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
        }
        IColumn::Filter result_filter(temp_block.rows(), 1);
        bool can_filter_all;
        RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, &result_filter,
                                                        &can_filter_all));
        if (dict_pos != 0) {
            // We have to clean the first column to insert right data.
            temp_block.get_by_position(0).column->assume_mutable()->clear();
        }

        // If can_filter_all = true, can filter this stripe.
        if (can_filter_all) {
            *is_stripe_filtered = true;
            return Status::OK();
        }

        // 3. Get dict codes.
        std::vector<int32_t> dict_codes;
        for (size_t i = 0; i < result_filter.size(); ++i) {
            if (result_filter[i]) {
                dict_codes.emplace_back(i);
            }
        }

        // About Performance: if dict_column size is too large, it will generate a large IN filter.
        if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
            it = _dict_filter_cols.erase(it);
            for (auto& ctx : ctxs) {
                _non_dict_filter_conjuncts.emplace_back(ctx);
            }
            continue;
        }

        // 4. Rewrite conjuncts.
        RETURN_IF_ERROR(_rewrite_dict_conjuncts(
                dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
        ++it;
    }
    return Status::OK();
}

Status OrcReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
                                          bool is_nullable) {
    VExprSPtr root;
    if (dict_codes.size() == 1) {
        {
            TFunction fn;
            TFunctionName fn_name;
            fn_name.__set_db_name("");
            fn_name.__set_function_name("eq");
            fn.__set_name(fn_name);
            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
            std::vector<TTypeDesc> arg_types;
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
            arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
            fn.__set_arg_types(arg_types);
            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
            fn.__set_has_var_args(false);

            TExprNode texpr_node;
            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
            texpr_node.__set_opcode(TExprOpcode::EQ);
            texpr_node.__set_fn(fn);
            texpr_node.__set_num_children(2);
            texpr_node.__set_is_nullable(is_nullable);
            root = VectorizedFnCall::create_shared(texpr_node);
        }
        {
            SlotDescriptor* slot = nullptr;
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
            for (auto* each : slots) {
                if (each->id() == slot_id) {
                    slot = each;
                    break;
                }
            }
            root->add_child(VSlotRef::create_shared(slot));
        }
        {
            TExprNode texpr_node;
            texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
            texpr_node.__set_type(create_type_desc(TYPE_INT));
            TIntLiteral int_literal;
            int_literal.__set_value(dict_codes[0]);
            texpr_node.__set_int_literal(int_literal);
            texpr_node.__set_is_nullable(is_nullable);
            root->add_child(VLiteral::create_shared(texpr_node));
        }
    } else {
        {
            TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
            TExprNode node;
            node.__set_type(type_desc);
            node.__set_node_type(TExprNodeType::IN_PRED);
            node.in_predicate.__set_is_not_in(false);
            node.__set_opcode(TExprOpcode::FILTER_IN);
            // VdirectInPredicate assume is_nullable = false.
            node.__set_is_nullable(false);

            std::shared_ptr<HybridSetBase> hybrid_set(
                    create_set(PrimitiveType::TYPE_INT, dict_codes.size(), false));
            for (int& dict_code : dict_codes) {
                hybrid_set->insert(&dict_code);
            }
            root = vectorized::VDirectInPredicate::create_shared(node, hybrid_set);
        }
        {
            SlotDescriptor* slot = nullptr;
            const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
            for (auto* each : slots) {
                if (each->id() == slot_id) {
                    slot = each;
                    break;
                }
            }
            root->add_child(VSlotRef::create_shared(slot));
        }
    }
    VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
    RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
    RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
    _dict_filter_conjuncts.emplace_back(rewritten_conjunct_ctx);
    return Status::OK();
}

Status OrcReader::_convert_dict_cols_to_string_cols(
        Block* block, const std::vector<orc::ColumnVectorBatch*>* batch_vec) {
    if (!_dict_cols_has_converted) {
        return Status::OK();
    }
    if (!_dict_filter_cols.empty()) {
        // todo: maybe do not need to build name to index map every time
        auto name_to_pos_map = block->get_name_to_pos_map();
        for (auto& dict_filter_cols : _dict_filter_cols) {
            if (!name_to_pos_map.contains(dict_filter_cols.first)) {
                return Status::InternalError("Failed to find dict filter column '{}' in block {}",
                                             dict_filter_cols.first, block->dump_structure());
            }
            auto pos = name_to_pos_map[dict_filter_cols.first];
            ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
            const ColumnPtr& column = column_with_type_and_name.column;

            auto file_column_name =
                    _table_info_node_ptr->children_file_column_name(dict_filter_cols.first);
            auto orc_col_idx = _colname_to_idx.find(file_column_name);
            if (orc_col_idx == _colname_to_idx.end()) {
                return Status::InternalError("Wrong read column '{}' in orc file",
                                             dict_filter_cols.first);
            }
            if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
                const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
                const auto* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
                DCHECK(dict_column);
                const NullMap& null_map = nullable_column->get_null_map_data();

                MutableColumnPtr string_column;
                if (batch_vec != nullptr) {
                    string_column = _convert_dict_column_to_string_column(
                            dict_column, &null_map, (*batch_vec)[orc_col_idx->second],
                            _type_map[file_column_name]);
                } else {
                    string_column = ColumnString::create();
                }

                column_with_type_and_name.type =
                        std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
                block->replace_by_position(
                        pos, ColumnNullable::create(std::move(string_column),
                                                    nullable_column->get_null_map_column_ptr()));
            } else {
                const auto* dict_column = assert_cast<const ColumnInt32*>(column.get());
                MutableColumnPtr string_column;
                if (batch_vec != nullptr) {
                    string_column = _convert_dict_column_to_string_column(
                            dict_column, nullptr, (*batch_vec)[orc_col_idx->second],
                            _type_map[file_column_name]);
                } else {
                    string_column = ColumnString::create();
                }

                column_with_type_and_name.type = std::make_shared<DataTypeString>();
                block->replace_by_position(pos, std::move(string_column));
            }
        }
        _dict_cols_has_converted = false;
    }
    return Status::OK();
}

// TODO: Possible optimization points.
//  After filtering the dict column, the null_map for the null dict column should always not be null.
//  Then it can avoid checking null_map. However, currently when inert materialization is enabled,
//  the filter column will not be filtered first, but will be filtered together at the end.
MutableColumnPtr OrcReader::_convert_dict_column_to_string_column(
        const ColumnInt32* dict_column, const NullMap* null_map, orc::ColumnVectorBatch* cvb,
        const orc::Type* orc_column_type) {
    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
    auto res = ColumnString::create();
    auto* encoded_string_vector_batch = static_cast<orc::EncodedStringVectorBatch*>(cvb);
    DCHECK(encoded_string_vector_batch);
    std::vector<StringRef> string_values;
    size_t num_values = dict_column->size();
    const int* dict_data = dict_column->get_data().data();
    string_values.reserve(num_values);
    size_t max_value_length = 0;
    if (orc_column_type->getKind() == orc::TypeKind::CHAR) {
        // Possibly there are some zero padding characters in CHAR type, we have to strip them off.
        if (null_map) {
            const auto* null_map_data = null_map->data();
            for (int i = 0; i < num_values; ++i) {
                if (!null_map_data[i]) {
                    char* val_ptr;
                    int64_t length;
                    encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
                                                                             length);
                    length = trim_right(val_ptr, length);
                    if (length > max_value_length) {
                        max_value_length = length;
                    }
                    string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
                                               length);
                } else {
                    // Orc doesn't fill null values in new batch, but the former batch has been release.
                    // Other types like int/long/timestamp... are flat types without pointer in them,
                    // so other types do not need to be handled separately like string.
                    string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
                }
            }
        } else {
            for (int i = 0; i < num_values; ++i) {
                char* val_ptr;
                int64_t length;
                encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
                                                                         length);
                length = trim_right(val_ptr, length);
                if (length > max_value_length) {
                    max_value_length = length;
                }
                string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
                                           length);
            }
        }
    } else {
        if (null_map) {
            const auto* null_map_data = null_map->data();
            for (int i = 0; i < num_values; ++i) {
                if (!null_map_data[i]) {
                    char* val_ptr;
                    int64_t length;
                    encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
                                                                             length);
                    if (length > max_value_length) {
                        max_value_length = length;
                    }
                    string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
                                               length);
                } else {
                    string_values.emplace_back(EMPTY_STRING_FOR_OVERFLOW, 0);
                }
            }
        } else {
            for (int i = 0; i < num_values; ++i) {
                char* val_ptr;
                int64_t length;
                encoded_string_vector_batch->dictionary->getValueByIndex(dict_data[i], val_ptr,
                                                                         length);
                if (length > max_value_length) {
                    max_value_length = length;
                }
                string_values.emplace_back((length > 0) ? val_ptr : EMPTY_STRING_FOR_OVERFLOW,
                                           length);
            }
        }
    }
    if (!string_values.empty()) {
        res->insert_many_strings_overflow(string_values.data(), num_values, max_value_length);
    }
    return res;
}

void ORCFileInputStream::beforeReadStripe(
        std::unique_ptr<orc::StripeInformation> current_strip_information,
        const std::vector<bool>& selected_columns,
        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
    if (_is_all_tiny_stripes) {
        return;
    }
    _collect_profile_before_close_file_stripe();
    _stripe_streams.clear();

    uint64_t offset = current_strip_information->getOffset();
    std::unordered_map<orc::StreamId, io::PrefetchRange> prefetch_ranges;
    for (uint64_t stream_id = 0; stream_id < current_strip_information->getNumberOfStreams();
         ++stream_id) {
        std::unique_ptr<orc::StreamInformation> stream =
                current_strip_information->getStreamInformation(stream_id);
        uint64_t columnId = stream->getColumnId();
        uint64_t length = stream->getLength();
        if (selected_columns[columnId]) {
            doris::io::PrefetchRange prefetch_range = {offset, offset + length};
            orc::StreamId streamId(stream->getColumnId(), stream->getKind());
            prefetch_ranges.emplace(std::move(streamId), std::move(prefetch_range));
        }
        offset += length;
    }
    _build_input_stripe_streams(prefetch_ranges, streams);
}

void ORCFileInputStream::_build_input_stripe_streams(
        const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
    if (ranges.empty()) {
        return;
    }

    std::unordered_map<orc::StreamId, io::PrefetchRange> small_ranges;
    std::unordered_map<orc::StreamId, io::PrefetchRange> large_ranges;

    for (const auto& range : ranges) {
        if (range.second.end_offset - range.second.start_offset <= _orc_once_max_read_bytes) {
            small_ranges.emplace(range.first, range.second);
        } else {
            large_ranges.emplace(range.first, range.second);
        }
    }

    _build_small_ranges_input_stripe_streams(small_ranges, streams);
    _build_large_ranges_input_stripe_streams(large_ranges, streams);
}

void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
        const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
    // Sort ranges by start_offset for efficient searching
    std::vector<std::pair<orc::StreamId, io::PrefetchRange>> sorted_ranges(ranges.begin(),
                                                                           ranges.end());
    std::sort(sorted_ranges.begin(), sorted_ranges.end(), [](const auto& a, const auto& b) {
        return a.second.start_offset < b.second.start_offset;
    });

    std::vector<io::PrefetchRange> all_ranges;
    all_ranges.reserve(ranges.size());
    std::transform(sorted_ranges.begin(), sorted_ranges.end(), std::back_inserter(all_ranges),
                   [](const auto& pair) { return pair.second; });
    auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
            all_ranges, _orc_max_merge_distance_bytes, _orc_once_max_read_bytes);

    for (const auto& merged_range : merged_ranges) {
        auto merge_range_file_reader =
                std::make_shared<OrcMergeRangeFileReader>(_profile, _file_reader, merged_range);

        std::shared_ptr<io::FileReader> tracing_file_reader;
        if (_io_ctx) {
            tracing_file_reader = std::make_shared<io::TracingFileReader>(
                    std::move(merge_range_file_reader), _io_ctx->file_reader_stats);
        } else {
            tracing_file_reader = std::move(merge_range_file_reader);
        }

        // Use binary search to find the starting point in sorted_ranges
        auto it =
                std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(),
                                 merged_range.start_offset, [](const auto& pair, uint64_t offset) {
                                     return pair.second.start_offset < offset;
                                 });

        // Iterate from the found starting point
        for (; it != sorted_ranges.end() && it->second.start_offset < merged_range.end_offset;
             ++it) {
            if (it->second.end_offset <= merged_range.end_offset) {
                auto stripe_stream_input_stream = std::make_shared<StripeStreamInputStream>(
                        getName(), tracing_file_reader, _io_ctx, _profile);
                streams.emplace(it->first, stripe_stream_input_stream);
                _stripe_streams.emplace_back(stripe_stream_input_stream);
            }
        }
    }
}

void ORCFileInputStream::_build_large_ranges_input_stripe_streams(
        const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& streams) {
    for (const auto& range : ranges) {
        auto stripe_stream_input_stream = std::make_shared<StripeStreamInputStream>(
                getName(),
                _io_ctx ? std::make_shared<io::TracingFileReader>(_file_reader,
                                                                  _io_ctx->file_reader_stats)
                        : _file_reader,
                _io_ctx, _profile);
        streams.emplace(range.first, stripe_stream_input_stream);
        _stripe_streams.emplace_back(stripe_stream_input_stream);
    }
}

void OrcReader::_execute_filter_position_delete_rowids(IColumn::Filter& filter) {
    if (_position_delete_ordered_rowids == nullptr) {
        return;
    }
    auto start = _row_reader->getRowNumber();
    auto nums = _batch->numElements;
    auto l = std::lower_bound(_position_delete_ordered_rowids->begin(),
                              _position_delete_ordered_rowids->end(), start);
    auto r = std::upper_bound(_position_delete_ordered_rowids->begin(),
                              _position_delete_ordered_rowids->end(), start + nums - 1);
    for (; l < r; l++) {
        filter[*l - start] = 0;
    }
}

#include "common/compile_check_end.h"
} // namespace doris::vectorized
